Coverage for src / tracekit / utils / memory_advanced.py: 67%

478 statements  

« prev     ^ index     » next       coverage.py v7.13.1, created at 2026-01-11 23:04 +0000

1"""Advanced memory management utilities. 

2 

3This module provides advanced memory management features including 

4quality modes, cache management, garbage collection, and streaming 

5backpressure support. 

6""" 

7 

8from __future__ import annotations 

9 

10import contextlib 

11import gc 

12import hashlib 

13import json 

14import logging 

15import pickle 

16import tempfile 

17import threading 

18import time 

19from collections import OrderedDict 

20from dataclasses import dataclass 

21from enum import Enum 

22from pathlib import Path 

23from typing import TYPE_CHECKING, Any, TypeVar 

24 

25import numpy as np 

26 

27if TYPE_CHECKING: 

28 from collections.abc import Iterator 

29 

30logger = logging.getLogger(__name__) 

31 

32__all__ = [ 

33 "AdaptiveMeasurementSelector", 

34 "BackpressureController", 

35 "CacheEntry", 

36 "CacheInvalidationStrategy", 

37 "DiskCache", 

38 "GCController", 

39 "MemoryLogger", 

40 "MultiChannelMemoryManager", 

41 "QualityMode", 

42 "QualityModeConfig", 

43 "WSLSwapChecker", 

44 "adaptive_measurements", 

45 "gc_aggressive", 

46 "get_wsl_memory_limits", 

47] 

48 

49 

50# ============================================================================= 

51# ============================================================================= 

52 

53 

54class QualityMode(Enum): 

55 """Quality mode for memory-constrained scenarios. 

56 

57 References: 

58 MEM-014: Quality vs Memory Trade-offs 

59 """ 

60 

61 PREVIEW = "preview" # Max speed, low memory, approximate 

62 BALANCED = "balanced" # Default, moderate quality 

63 HIGH_QUALITY = "high" # Accurate, may require more memory 

64 

65 

66@dataclass 

67class QualityModeConfig: 

68 """Configuration for quality modes. 

69 

70 Attributes: 

71 mode: Quality mode 

72 downsample_factor: Downsampling factor for preview mode 

73 nfft_factor: FFT size reduction factor 

74 overlap_factor: Overlap reduction factor 

75 enable_caching: Enable intermediate caching 

76 use_approximations: Use faster approximations 

77 

78 References: 

79 MEM-014: Quality vs Memory Trade-offs 

80 """ 

81 

82 mode: QualityMode = QualityMode.BALANCED 

83 downsample_factor: int = 1 

84 nfft_factor: float = 1.0 

85 overlap_factor: float = 1.0 

86 enable_caching: bool = True 

87 use_approximations: bool = False 

88 

89 @classmethod 

90 def for_mode(cls, mode: QualityMode | str) -> QualityModeConfig: 

91 """Get configuration for specified quality mode. 

92 

93 Args: 

94 mode: Quality mode 

95 

96 Returns: 

97 Appropriate configuration 

98 """ 

99 if isinstance(mode, str): 99 ↛ 100line 99 didn't jump to line 100 because the condition on line 99 was never true

100 mode = QualityMode(mode.lower()) 

101 

102 if mode == QualityMode.PREVIEW: 

103 return cls( 

104 mode=mode, 

105 downsample_factor=8, 

106 nfft_factor=0.25, 

107 overlap_factor=0.5, 

108 enable_caching=False, 

109 use_approximations=True, 

110 ) 

111 elif mode == QualityMode.HIGH_QUALITY: 

112 return cls( 

113 mode=mode, 

114 downsample_factor=1, 

115 nfft_factor=2.0, 

116 overlap_factor=1.5, 

117 enable_caching=True, 

118 use_approximations=False, 

119 ) 

120 else: # BALANCED 

121 return cls( 

122 mode=mode, 

123 downsample_factor=1, 

124 nfft_factor=1.0, 

125 overlap_factor=1.0, 

126 enable_caching=True, 

127 use_approximations=False, 

128 ) 

129 

130 

131# Global quality mode 

132_current_quality_mode = QualityMode.BALANCED 

133 

134 

135def set_quality_mode(mode: QualityMode | str) -> None: 

136 """Set global quality mode. 

137 

138 Args: 

139 mode: Quality mode to use 

140 """ 

141 global _current_quality_mode 

142 if isinstance(mode, str): 142 ↛ 144line 142 didn't jump to line 144 because the condition on line 142 was always true

143 mode = QualityMode(mode.lower()) 

144 _current_quality_mode = mode 

145 logger.info(f"Quality mode set to: {mode.value}") 

146 

147 

148def get_quality_mode() -> QualityMode: 

149 """Get current quality mode.""" 

150 return _current_quality_mode 

151 

152 

153def get_quality_config() -> QualityModeConfig: 

154 """Get configuration for current quality mode.""" 

155 return QualityModeConfig.for_mode(_current_quality_mode) 

156 

157 

158# ============================================================================= 

159# ============================================================================= 

160 

161 

162class GCController: 

163 """Garbage collection controller. 

164 

165 Controls when and how garbage collection occurs based on 

166 memory pressure and operation completion. 

167 

168 References: 

169 MEM-020: Garbage Collection Triggers 

170 """ 

171 

172 def __init__(self, aggressive: bool = False) -> None: 

173 """Initialize GC controller. 

174 

175 Args: 

176 aggressive: Enable aggressive GC mode 

177 """ 

178 self._aggressive = aggressive 

179 self._collection_count = 0 

180 self._bytes_collected = 0 

181 

182 @property 

183 def aggressive(self) -> bool: 

184 """Check if aggressive mode enabled.""" 

185 return self._aggressive 

186 

187 @aggressive.setter 

188 def aggressive(self, value: bool) -> None: 

189 """Set aggressive mode.""" 

190 self._aggressive = value 

191 

192 def collect(self) -> int: 

193 """Perform garbage collection. 

194 

195 Returns: 

196 Number of objects collected 

197 """ 

198 if self._aggressive: 198 ↛ 203line 198 didn't jump to line 203 because the condition on line 198 was always true

199 # Full collection with all generations 

200 collected = gc.collect(generation=2) 

201 else: 

202 # Standard collection 

203 collected = gc.collect() 

204 

205 self._collection_count += 1 

206 self._bytes_collected += collected 

207 logger.debug(f"GC collected {collected} objects") 

208 return collected 

209 

210 def collect_after_operation(self) -> None: 

211 """Collect garbage after a large operation.""" 

212 if self._aggressive: 

213 # Force immediate collection 

214 gc.collect() 

215 gc.collect() # Second pass for circular references 

216 

217 def get_stats(self) -> dict[str, Any]: 

218 """Get GC statistics. 

219 

220 Returns: 

221 Dict with GC statistics 

222 """ 

223 return { 

224 "collection_count": self._collection_count, 

225 "bytes_collected": self._bytes_collected, 

226 "aggressive_mode": self._aggressive, 

227 "gc_threshold": gc.get_threshold(), 

228 "gc_count": gc.get_count(), 

229 } 

230 

231 

232# Global GC controller 

233_gc_controller = GCController() 

234 

235 

236def gc_aggressive(enable: bool = True) -> None: 

237 """Enable/disable aggressive garbage collection. 

238 

239 Args: 

240 enable: Whether to enable aggressive GC 

241 """ 

242 _gc_controller.aggressive = enable 

243 

244 

245def force_gc() -> int: 

246 """Force garbage collection. 

247 

248 Returns: 

249 Number of objects collected 

250 """ 

251 return _gc_controller.collect() 

252 

253 

254# ============================================================================= 

255# ============================================================================= 

256 

257 

258class WSLSwapChecker: 

259 """WSL swap availability checker. 

260 

261 Detects WSL environment and applies conservative memory 

262 estimates accounting for limited swap. 

263 

264 References: 

265 MEM-023: WSL Swap Awareness 

266 """ 

267 

268 def __init__(self) -> None: 

269 """Initialize WSL checker.""" 

270 self._is_wsl = self._detect_wsl() 

271 self._wslconfig_parsed = False 

272 self._wslconfig_memory: int | None = None 

273 self._wslconfig_swap: int | None = None 

274 

275 def _detect_wsl(self) -> bool: 

276 """Detect if running in WSL.""" 

277 try: 

278 with open("/proc/version") as f: 

279 version = f.read().lower() 

280 return "microsoft" in version or "wsl" in version 

281 except FileNotFoundError: 

282 return False 

283 

284 @property 

285 def is_wsl(self) -> bool: 

286 """Check if running in WSL.""" 

287 return self._is_wsl 

288 

289 def get_wsl_memory_limit(self) -> int | None: 

290 """Get WSL memory limit from .wslconfig if available. 

291 

292 Returns: 

293 Memory limit in bytes, or None if not configured 

294 """ 

295 if not self._is_wsl: 295 ↛ 298line 295 didn't jump to line 298 because the condition on line 295 was always true

296 return None 

297 

298 if self._wslconfig_parsed: 

299 return self._wslconfig_memory 

300 

301 # Try to parse .wslconfig 

302 wslconfig_path = Path.home() / ".wslconfig" 

303 if wslconfig_path.exists(): 

304 try: 

305 content = wslconfig_path.read_text() 

306 for line in content.split("\n"): 

307 if line.strip().lower().startswith("memory="): 

308 value = line.split("=")[1].strip() 

309 self._wslconfig_memory = self._parse_size(value) 

310 elif line.strip().lower().startswith("swap="): 

311 value = line.split("=")[1].strip() 

312 self._wslconfig_swap = self._parse_size(value) 

313 except Exception as e: 

314 logger.warning(f"Failed to parse .wslconfig: {e}") 

315 

316 self._wslconfig_parsed = True 

317 return self._wslconfig_memory 

318 

319 def get_wsl_swap_limit(self) -> int | None: 

320 """Get WSL swap limit. 

321 

322 Returns: 

323 Swap limit in bytes, or None if not configured 

324 """ 

325 if not self._is_wsl: 325 ↛ 328line 325 didn't jump to line 328 because the condition on line 325 was always true

326 return None 

327 

328 if not self._wslconfig_parsed: 

329 self.get_wsl_memory_limit() 

330 

331 return self._wslconfig_swap 

332 

333 def _parse_size(self, size_str: str) -> int: 

334 """Parse size string like '8GB' to bytes.""" 

335 size_str = size_str.strip().upper() 

336 multipliers = { 

337 "K": 1024, 

338 "KB": 1024, 

339 "M": 1024**2, 

340 "MB": 1024**2, 

341 "G": 1024**3, 

342 "GB": 1024**3, 

343 "T": 1024**4, 

344 "TB": 1024**4, 

345 } 

346 

347 for suffix, mult in multipliers.items(): 

348 if size_str.endswith(suffix): 

349 num = float(size_str[: -len(suffix)]) 

350 return int(num * mult) 

351 

352 return int(size_str) 

353 

354 def get_safe_memory(self) -> int: 

355 """Get safe memory limit for WSL. 

356 

357 Returns: 

358 Recommended maximum memory to use 

359 """ 

360 if not self._is_wsl: 360 ↛ 366line 360 didn't jump to line 366 because the condition on line 360 was always true

361 import psutil 

362 

363 return int(psutil.virtual_memory().available * 0.8) 

364 

365 # WSL has minimal swap by default - use physical RAM only 

366 import psutil 

367 

368 total = psutil.virtual_memory().total 

369 

370 # Check .wslconfig limit 

371 wsl_limit = self.get_wsl_memory_limit() 

372 if wsl_limit is not None: 

373 total = min(total, wsl_limit) 

374 

375 # Use 50% of available for safety (WSL can be unpredictable) 

376 available = psutil.virtual_memory().available 

377 return min(int(available * 0.5), int(total * 0.5)) 

378 

379 

380def get_wsl_memory_limits() -> dict[str, int | None]: 

381 """Get WSL memory limits. 

382 

383 Returns: 

384 Dict with memory and swap limits 

385 """ 

386 checker = WSLSwapChecker() 

387 return { 

388 "is_wsl": checker.is_wsl, 

389 "memory_limit": checker.get_wsl_memory_limit(), 

390 "swap_limit": checker.get_wsl_swap_limit(), 

391 "safe_memory": checker.get_safe_memory(), 

392 } 

393 

394 

395# ============================================================================= 

396# ============================================================================= 

397 

398 

399@dataclass 

400class MemoryLogEntry: 

401 """Single memory log entry. 

402 

403 Attributes: 

404 timestamp: Entry timestamp 

405 operation: Operation name 

406 memory_used: Memory used in bytes 

407 memory_peak: Peak memory 

408 duration: Operation duration in seconds 

409 """ 

410 

411 timestamp: float 

412 operation: str 

413 memory_used: int 

414 memory_peak: int 

415 duration: float 

416 

417 

418class MemoryLogger: 

419 """Memory usage logger for debugging. 

420 

421 Logs memory usage at each operation for analysis. 

422 

423 References: 

424 MEM-025: Memory Usage Logging 

425 """ 

426 

427 def __init__( 

428 self, 

429 log_file: str | Path | None = None, 

430 format: str = "csv", 

431 ) -> None: 

432 """Initialize memory logger. 

433 

434 Args: 

435 log_file: Output file path 

436 format: Output format ('csv' or 'json') 

437 """ 

438 self._log_file = Path(log_file) if log_file else None 

439 self._format = format 

440 self._entries: list[MemoryLogEntry] = [] 

441 self._enabled = False 

442 self._peak_memory = 0 

443 self._start_memory = 0 

444 self._lock = threading.Lock() 

445 

446 def enable(self) -> None: 

447 """Enable memory logging.""" 

448 self._enabled = True 

449 import psutil 

450 

451 process = psutil.Process() 

452 self._start_memory = process.memory_info().rss 

453 self._peak_memory = self._start_memory 

454 logger.info("Memory logging enabled") 

455 

456 def disable(self) -> None: 

457 """Disable memory logging.""" 

458 self._enabled = False 

459 self.flush() 

460 

461 def log_operation( 

462 self, 

463 operation: str, 

464 duration: float = 0.0, 

465 ) -> None: 

466 """Log memory for an operation. 

467 

468 Args: 

469 operation: Operation name 

470 duration: Operation duration 

471 """ 

472 if not self._enabled: 472 ↛ 473line 472 didn't jump to line 473 because the condition on line 472 was never true

473 return 

474 

475 import psutil 

476 

477 process = psutil.Process() 

478 memory_used = process.memory_info().rss 

479 self._peak_memory = max(self._peak_memory, memory_used) 

480 

481 entry = MemoryLogEntry( 

482 timestamp=time.time(), 

483 operation=operation, 

484 memory_used=memory_used, 

485 memory_peak=self._peak_memory, 

486 duration=duration, 

487 ) 

488 

489 with self._lock: 

490 self._entries.append(entry) 

491 

492 def flush(self) -> None: 

493 """Write log to file.""" 

494 if self._log_file is None or not self._entries: 494 ↛ 495line 494 didn't jump to line 495 because the condition on line 494 was never true

495 return 

496 

497 with self._lock: 

498 entries = self._entries.copy() 

499 self._entries.clear() 

500 

501 if self._format == "csv": 501 ↛ 504line 501 didn't jump to line 504 because the condition on line 501 was always true

502 self._write_csv(entries) 

503 else: 

504 self._write_json(entries) 

505 

506 def _write_csv(self, entries: list[MemoryLogEntry]) -> None: 

507 """Write entries as CSV.""" 

508 import csv 

509 

510 assert self._log_file is not None 

511 mode = "a" if self._log_file.exists() else "w" 

512 with open(self._log_file, mode, newline="") as f: 

513 writer = csv.writer(f) 

514 if mode == "w": 514 ↛ 515line 514 didn't jump to line 515 because the condition on line 514 was never true

515 writer.writerow( 

516 ["timestamp", "operation", "memory_used", "memory_peak", "duration"] 

517 ) 

518 for entry in entries: 

519 writer.writerow( 

520 [ 

521 entry.timestamp, 

522 entry.operation, 

523 entry.memory_used, 

524 entry.memory_peak, 

525 entry.duration, 

526 ] 

527 ) 

528 

529 def _write_json(self, entries: list[MemoryLogEntry]) -> None: 

530 """Write entries as JSON.""" 

531 assert self._log_file is not None 

532 data = [ 

533 { 

534 "timestamp": e.timestamp, 

535 "operation": e.operation, 

536 "memory_used": e.memory_used, 

537 "memory_peak": e.memory_peak, 

538 "duration": e.duration, 

539 } 

540 for e in entries 

541 ] 

542 

543 with open(self._log_file, "a") as f: 

544 f.writelines(json.dumps(entry) + "\n" for entry in data) 

545 

546 def get_summary(self) -> dict[str, Any]: 

547 """Get logging summary. 

548 

549 Returns: 

550 Summary statistics 

551 """ 

552 return { 

553 "entries_logged": len(self._entries), 

554 "peak_memory_bytes": self._peak_memory, 

555 "start_memory_bytes": self._start_memory, 

556 "memory_growth_bytes": self._peak_memory - self._start_memory, 

557 "enabled": self._enabled, 

558 } 

559 

560 

561# ============================================================================= 

562# ============================================================================= 

563 

564 

565class AdaptiveMeasurementSelector: 

566 """Adaptive measurement selection for large files. 

567 

568 Disables memory-intensive measurements for very large files 

569 and suggests alternatives. 

570 

571 References: 

572 MEM-028: Adaptive Measurement Selection 

573 """ 

574 

575 # Default size thresholds (in samples) 

576 THRESHOLDS = { # noqa: RUF012 

577 "eye_diagram": 1e8, # 100M samples 

578 "spectrogram": 5e8, # 500M samples 

579 "full_correlation": 1e9, # 1B samples 

580 "wavelet": 2e8, # 200M samples 

581 } 

582 

583 def __init__( 

584 self, 

585 file_size_samples: int, 

586 enable_all: bool = False, 

587 ) -> None: 

588 """Initialize selector. 

589 

590 Args: 

591 file_size_samples: Number of samples in file 

592 enable_all: Override to enable all measurements 

593 """ 

594 self._size = file_size_samples 

595 self._enable_all = enable_all 

596 

597 def is_enabled(self, measurement: str) -> bool: 

598 """Check if measurement is enabled for current file size. 

599 

600 Args: 

601 measurement: Measurement name 

602 

603 Returns: 

604 True if measurement should be enabled 

605 """ 

606 if self._enable_all: 

607 return True 

608 

609 threshold = self.THRESHOLDS.get(measurement, float("inf")) 

610 return self._size < threshold 

611 

612 def get_recommendations(self) -> dict[str, str]: 

613 """Get recommendations for disabled measurements. 

614 

615 Returns: 

616 Dict mapping disabled measurement to recommendation 

617 """ 

618 recommendations = {} 

619 

620 for measurement, threshold in self.THRESHOLDS.items(): 

621 if self._size >= threshold: 621 ↛ 620line 621 didn't jump to line 620 because the condition on line 621 was always true

622 size_gb = self._size * 8 / 1e9 # Assume float64 

623 if measurement == "eye_diagram": 

624 recommendations[measurement] = ( 

625 f"File size ({size_gb:.1f} GB) exceeds threshold. " 

626 f"Use --roi START:END to specify time range, or " 

627 f"--enable-all to force processing." 

628 ) 

629 elif measurement == "spectrogram": 

630 recommendations[measurement] = ( 

631 "File too large for full spectrogram. " 

632 "Use chunked_spectrogram() or downsample data." 

633 ) 

634 elif measurement == "full_correlation": 

635 recommendations[measurement] = ( 

636 f"Correlation on {size_gb:.1f} GB requires chunked approach. " 

637 f"Use correlate_chunked() instead." 

638 ) 

639 

640 return recommendations 

641 

642 

643def adaptive_measurements( 

644 samples: int, 

645 enable_all: bool = False, 

646) -> AdaptiveMeasurementSelector: 

647 """Create adaptive measurement selector. 

648 

649 Args: 

650 samples: Number of samples 

651 enable_all: Override to enable all 

652 

653 Returns: 

654 Selector instance 

655 """ 

656 return AdaptiveMeasurementSelector(samples, enable_all) 

657 

658 

659# ============================================================================= 

660# ============================================================================= 

661 

662 

663T = TypeVar("T") 

664 

665 

666@dataclass 

667class CacheEntry[T]: 

668 """Cache entry with metadata. 

669 

670 Attributes: 

671 key: Cache key 

672 value: Cached value 

673 created_at: Creation timestamp 

674 accessed_at: Last access timestamp 

675 source_hash: Hash of source data 

676 params_hash: Hash of parameters 

677 ttl_seconds: Time-to-live in seconds 

678 

679 References: 

680 MEM-030: Cache Invalidation Strategy 

681 """ 

682 

683 key: str 

684 value: T 

685 created_at: float 

686 accessed_at: float 

687 source_hash: str 

688 params_hash: str 

689 ttl_seconds: float = 3600.0 # 1 hour default 

690 

691 @property 

692 def is_expired(self) -> bool: 

693 """Check if entry has expired.""" 

694 if self.ttl_seconds <= 0: 694 ↛ 695line 694 didn't jump to line 695 because the condition on line 694 was never true

695 return False 

696 return time.time() - self.created_at > self.ttl_seconds 

697 

698 @property 

699 def age_seconds(self) -> float: 

700 """Get entry age in seconds.""" 

701 return time.time() - self.created_at 

702 

703 

704class CacheInvalidationStrategy: 

705 """Cache invalidation strategy manager. 

706 

707 Manages cache invalidation based on data changes, 

708 parameter changes, and time-to-live. 

709 

710 References: 

711 MEM-030: Cache Invalidation Strategy 

712 """ 

713 

714 def __init__( 

715 self, 

716 max_size: int = 1000, 

717 default_ttl: float = 3600.0, 

718 ) -> None: 

719 """Initialize cache. 

720 

721 Args: 

722 max_size: Maximum entries 

723 default_ttl: Default TTL in seconds 

724 """ 

725 self._cache: OrderedDict[str, CacheEntry[Any]] = OrderedDict() 

726 self._max_size = max_size 

727 self._default_ttl = default_ttl 

728 self._lock = threading.Lock() 

729 self._hits = 0 

730 self._misses = 0 

731 

732 def _compute_hash(self, data: Any) -> str: 

733 """Compute hash of data for comparison.""" 

734 if isinstance(data, np.ndarray): 734 ↛ 736line 734 didn't jump to line 736 because the condition on line 734 was always true

735 return hashlib.md5(data.tobytes()[:1024]).hexdigest() 

736 elif isinstance(data, dict | list): 

737 return hashlib.md5(json.dumps(data, sort_keys=True).encode()).hexdigest() 

738 else: 

739 return hashlib.md5(str(data).encode()).hexdigest() 

740 

741 def get( 

742 self, 

743 key: str, 

744 source_data: Any = None, 

745 params: dict[str, Any] | None = None, 

746 ) -> tuple[Any, bool]: 

747 """Get value from cache. 

748 

749 Args: 

750 key: Cache key 

751 source_data: Source data to validate against 

752 params: Parameters to validate against 

753 

754 Returns: 

755 (value, hit) tuple 

756 """ 

757 with self._lock: 

758 if key not in self._cache: 

759 self._misses += 1 

760 return None, False 

761 

762 entry = self._cache[key] 

763 

764 # Check expiration 

765 if entry.is_expired: 765 ↛ 766line 765 didn't jump to line 766 because the condition on line 765 was never true

766 del self._cache[key] 

767 self._misses += 1 

768 return None, False 

769 

770 # Check source data change 

771 if source_data is not None: 

772 source_hash = self._compute_hash(source_data) 

773 if source_hash != entry.source_hash: 

774 del self._cache[key] 

775 self._misses += 1 

776 return None, False 

777 

778 # Check parameter change 

779 if params is not None: 779 ↛ 780line 779 didn't jump to line 780 because the condition on line 779 was never true

780 params_hash = self._compute_hash(params) 

781 if params_hash != entry.params_hash: 

782 del self._cache[key] 

783 self._misses += 1 

784 return None, False 

785 

786 # Update access time and move to end 

787 entry.accessed_at = time.time() 

788 self._cache.move_to_end(key) 

789 self._hits += 1 

790 return entry.value, True 

791 

792 def set( 

793 self, 

794 key: str, 

795 value: Any, 

796 source_data: Any = None, 

797 params: dict[str, Any] | None = None, 

798 ttl: float | None = None, 

799 ) -> None: 

800 """Set cache value. 

801 

802 Args: 

803 key: Cache key 

804 value: Value to cache 

805 source_data: Source data for invalidation 

806 params: Parameters for invalidation 

807 ttl: Time-to-live (uses default if None) 

808 """ 

809 with self._lock: 

810 # Evict if at capacity 

811 while len(self._cache) >= self._max_size: 811 ↛ 812line 811 didn't jump to line 812 because the condition on line 811 was never true

812 self._cache.popitem(last=False) 

813 

814 entry = CacheEntry( 

815 key=key, 

816 value=value, 

817 created_at=time.time(), 

818 accessed_at=time.time(), 

819 source_hash=self._compute_hash(source_data) if source_data is not None else "", 

820 params_hash=self._compute_hash(params) if params is not None else "", 

821 ttl_seconds=ttl if ttl is not None else self._default_ttl, 

822 ) 

823 self._cache[key] = entry 

824 

825 def invalidate(self, key: str) -> bool: 

826 """Invalidate specific cache entry. 

827 

828 Args: 

829 key: Key to invalidate 

830 

831 Returns: 

832 True if entry existed 

833 """ 

834 with self._lock: 

835 if key in self._cache: 835 ↛ 838line 835 didn't jump to line 838 because the condition on line 835 was always true

836 del self._cache[key] 

837 return True 

838 return False 

839 

840 def invalidate_by_source(self, source_data: Any) -> int: 

841 """Invalidate all entries with matching source. 

842 

843 Args: 

844 source_data: Source data to match 

845 

846 Returns: 

847 Number of entries invalidated 

848 """ 

849 source_hash = self._compute_hash(source_data) 

850 count = 0 

851 with self._lock: 

852 keys_to_remove = [k for k, v in self._cache.items() if v.source_hash == source_hash] 

853 for key in keys_to_remove: 

854 del self._cache[key] 

855 count += 1 

856 return count 

857 

858 def clear(self) -> int: 

859 """Clear all cache entries. 

860 

861 Returns: 

862 Number of entries cleared 

863 """ 

864 with self._lock: 

865 count = len(self._cache) 

866 self._cache.clear() 

867 return count 

868 

869 def cleanup_expired(self) -> int: 

870 """Remove expired entries. 

871 

872 Returns: 

873 Number of entries removed 

874 """ 

875 count = 0 

876 with self._lock: 

877 keys_to_remove = [k for k, v in self._cache.items() if v.is_expired] 

878 for key in keys_to_remove: 

879 del self._cache[key] 

880 count += 1 

881 return count 

882 

883 def get_stats(self) -> dict[str, Any]: 

884 """Get cache statistics. 

885 

886 Returns: 

887 Cache statistics 

888 """ 

889 with self._lock: 

890 return { 

891 "size": len(self._cache), 

892 "max_size": self._max_size, 

893 "hits": self._hits, 

894 "misses": self._misses, 

895 "hit_rate": self._hits / (self._hits + self._misses) 

896 if (self._hits + self._misses) > 0 

897 else 0, 

898 "default_ttl": self._default_ttl, 

899 } 

900 

901 

902# ============================================================================= 

903# ============================================================================= 

904 

905 

906class DiskCache: 

907 """Disk-based cache for large intermediates. 

908 

909 Spills cache to disk when memory limit exceeded. 

910 

911 References: 

912 MEM-031: Persistent Cache (Disk-Based) 

913 """ 

914 

915 def __init__( 

916 self, 

917 cache_dir: str | Path | None = None, 

918 max_memory_mb: int = 1024, 

919 max_disk_mb: int = 10240, 

920 ttl_hours: float = 1.0, 

921 ) -> None: 

922 """Initialize disk cache. 

923 

924 Args: 

925 cache_dir: Cache directory (default: temp dir) 

926 max_memory_mb: Max in-memory cache size in MB 

927 max_disk_mb: Max on-disk cache size in MB 

928 ttl_hours: Time-to-live in hours 

929 """ 

930 self._cache_dir = ( 

931 Path(cache_dir) if cache_dir else Path(tempfile.gettempdir()) / "tracekit_cache" 

932 ) 

933 self._cache_dir.mkdir(parents=True, exist_ok=True) 

934 self._max_memory = max_memory_mb * 1024 * 1024 

935 self._max_disk = max_disk_mb * 1024 * 1024 

936 self._ttl_seconds = ttl_hours * 3600 

937 self._memory_cache: OrderedDict[str, tuple[Any, int]] = OrderedDict() 

938 self._memory_used = 0 

939 self._lock = threading.Lock() 

940 

941 def _get_cache_path(self, key: str) -> Path: 

942 """Get cache file path for key.""" 

943 key_hash = hashlib.sha256(key.encode()).hexdigest()[:16] 

944 return self._cache_dir / f"{key_hash}.cache" 

945 

946 def _estimate_size(self, value: Any) -> int: 

947 """Estimate memory size of value.""" 

948 if isinstance(value, np.ndarray): 

949 return value.nbytes # type: ignore[no-any-return] 

950 else: 

951 return len(pickle.dumps(value)) 

952 

953 def get(self, key: str) -> tuple[Any, bool]: 

954 """Get value from cache. 

955 

956 Args: 

957 key: Cache key 

958 

959 Returns: 

960 (value, hit) tuple 

961 """ 

962 # Check memory cache first 

963 with self._lock: 

964 if key in self._memory_cache: 964 ↛ 970line 964 didn't jump to line 970

965 value, size = self._memory_cache[key] 

966 self._memory_cache.move_to_end(key) 

967 return value, True 

968 

969 # Check disk cache 

970 cache_path = self._get_cache_path(key) 

971 if cache_path.exists(): 

972 # Check TTL 

973 if time.time() - cache_path.stat().st_mtime > self._ttl_seconds: 

974 cache_path.unlink() 

975 return None, False 

976 

977 try: 

978 with open(cache_path, "rb") as f: 

979 value = pickle.load(f) 

980 

981 # Promote to memory cache if space 

982 size = self._estimate_size(value) 

983 if size < self._max_memory: 

984 self._add_to_memory(key, value, size) 

985 

986 return value, True 

987 except Exception as e: 

988 logger.warning(f"Failed to load cache: {e}") 

989 return None, False 

990 

991 return None, False 

992 

993 def _add_to_memory(self, key: str, value: Any, size: int) -> None: 

994 """Add to memory cache, evicting if needed.""" 

995 with self._lock: 

996 # Evict until we have space 

997 while self._memory_used + size > self._max_memory and self._memory_cache: 997 ↛ 998line 997 didn't jump to line 998 because the condition on line 997 was never true

998 evict_key, (evict_value, evict_size) = self._memory_cache.popitem(last=False) 

999 self._memory_used -= evict_size 

1000 # Spill to disk 

1001 self._write_to_disk(evict_key, evict_value) 

1002 

1003 self._memory_cache[key] = (value, size) 

1004 self._memory_used += size 

1005 

1006 def _write_to_disk(self, key: str, value: Any) -> None: 

1007 """Write value to disk cache.""" 

1008 # Check disk space 

1009 self._cleanup_disk() 

1010 

1011 cache_path = self._get_cache_path(key) 

1012 try: 

1013 with open(cache_path, "wb") as f: 

1014 pickle.dump(value, f) 

1015 except Exception as e: 

1016 logger.warning(f"Failed to write cache: {e}") 

1017 

1018 def _cleanup_disk(self) -> None: 

1019 """Clean up disk cache if over limit.""" 

1020 try: 

1021 total_size = sum(f.stat().st_size for f in self._cache_dir.glob("*.cache")) 

1022 if total_size > self._max_disk: 

1023 # Remove oldest files 

1024 files = sorted(self._cache_dir.glob("*.cache"), key=lambda f: f.stat().st_mtime) 

1025 for f in files: 

1026 if total_size <= self._max_disk * 0.8: 

1027 break 

1028 total_size -= f.stat().st_size 

1029 f.unlink() 

1030 except Exception as e: 

1031 logger.warning(f"Disk cleanup error: {e}") 

1032 

1033 def set(self, key: str, value: Any) -> None: 

1034 """Set cache value. 

1035 

1036 Args: 

1037 key: Cache key 

1038 value: Value to cache 

1039 """ 

1040 size = self._estimate_size(value) 

1041 if size < self._max_memory: 1041 ↛ 1045line 1041 didn't jump to line 1045 because the condition on line 1041 was always true

1042 self._add_to_memory(key, value, size) 

1043 else: 

1044 # Too large for memory, write directly to disk 

1045 self._write_to_disk(key, value) 

1046 

1047 def clear(self) -> None: 

1048 """Clear all caches.""" 

1049 with self._lock: 

1050 self._memory_cache.clear() 

1051 self._memory_used = 0 

1052 

1053 for f in self._cache_dir.glob("*.cache"): 1053 ↛ 1054line 1053 didn't jump to line 1054 because the loop on line 1053 never started

1054 with contextlib.suppress(Exception): 

1055 f.unlink() 

1056 

1057 

1058# ============================================================================= 

1059# ============================================================================= 

1060 

1061 

1062class BackpressureController: 

1063 """Backpressure controller for streaming. 

1064 

1065 Manages slow consumer handling for real-time streaming. 

1066 

1067 References: 

1068 MEM-032: Backpressure for Streaming 

1069 """ 

1070 

1071 def __init__( 

1072 self, 

1073 buffer_size: int = 10000, 

1074 drop_oldest: bool = True, 

1075 warn_threshold: float = 0.8, 

1076 ) -> None: 

1077 """Initialize backpressure controller. 

1078 

1079 Args: 

1080 buffer_size: Maximum buffer size 

1081 drop_oldest: Drop oldest on overflow (vs pause) 

1082 warn_threshold: Warning threshold (0-1) 

1083 """ 

1084 self._buffer: list[Any] = [] 

1085 self._buffer_size = buffer_size 

1086 self._drop_oldest = drop_oldest 

1087 self._warn_threshold = warn_threshold 

1088 self._dropped_count = 0 

1089 self._paused = False 

1090 self._lock = threading.Lock() 

1091 

1092 @property 

1093 def is_paused(self) -> bool: 

1094 """Check if acquisition should be paused.""" 

1095 return self._paused 

1096 

1097 @property 

1098 def buffer_usage(self) -> float: 

1099 """Get buffer usage ratio (0-1).""" 

1100 return len(self._buffer) / self._buffer_size 

1101 

1102 @property 

1103 def dropped_count(self) -> int: 

1104 """Get number of dropped samples.""" 

1105 return self._dropped_count 

1106 

1107 def push(self, data: Any) -> bool: 

1108 """Push data to buffer. 

1109 

1110 Args: 

1111 data: Data to buffer 

1112 

1113 Returns: 

1114 True if accepted, False if dropped 

1115 """ 

1116 with self._lock: 

1117 if len(self._buffer) >= self._buffer_size: 

1118 if self._drop_oldest: 1118 ↛ 1125line 1118 didn't jump to line 1125 because the condition on line 1118 was always true

1119 self._buffer.pop(0) 

1120 self._dropped_count += 1 

1121 logger.warning( 

1122 f"Buffer overflow: dropped frame ({self._dropped_count} total dropped)" 

1123 ) 

1124 else: 

1125 self._paused = True 

1126 return False 

1127 

1128 self._buffer.append(data) 

1129 

1130 # Check warning threshold 

1131 if self.buffer_usage > self._warn_threshold: 

1132 logger.warning( 

1133 f"Buffer usage at {self.buffer_usage * 100:.0f}% - " 

1134 f"analysis slower than acquisition" 

1135 ) 

1136 

1137 return True 

1138 

1139 def pop(self) -> Any | None: 

1140 """Pop data from buffer. 

1141 

1142 Returns: 

1143 Data or None if empty 

1144 """ 

1145 with self._lock: 

1146 if not self._buffer: 1146 ↛ 1147line 1146 didn't jump to line 1147 because the condition on line 1146 was never true

1147 return None 

1148 

1149 data = self._buffer.pop(0) 

1150 self._paused = False 

1151 return data 

1152 

1153 def pop_all(self) -> list[Any]: 

1154 """Pop all data from buffer. 

1155 

1156 Returns: 

1157 List of all buffered data 

1158 """ 

1159 with self._lock: 

1160 data = self._buffer.copy() 

1161 self._buffer.clear() 

1162 self._paused = False 

1163 return data 

1164 

1165 def signal_backpressure(self) -> None: 

1166 """Signal backpressure to source.""" 

1167 with self._lock: 

1168 self._paused = True 

1169 

1170 def get_stats(self) -> dict[str, Any]: 

1171 """Get backpressure statistics. 

1172 

1173 Returns: 

1174 Statistics dict 

1175 """ 

1176 return { 

1177 "buffer_size": len(self._buffer), 

1178 "buffer_capacity": self._buffer_size, 

1179 "usage_ratio": self.buffer_usage, 

1180 "dropped_count": self._dropped_count, 

1181 "is_paused": self._paused, 

1182 } 

1183 

1184 

1185# ============================================================================= 

1186# ============================================================================= 

1187 

1188 

1189class MultiChannelMemoryManager: 

1190 """Multi-channel memory management. 

1191 

1192 Manages memory for multi-channel processing, enabling 

1193 sequential or subset processing for bounded memory. 

1194 

1195 References: 

1196 MEM-033: Multi-Channel Memory Management 

1197 """ 

1198 

1199 def __init__( 

1200 self, 

1201 max_memory_mb: int = 4096, 

1202 bytes_per_sample: int = 8, 

1203 ) -> None: 

1204 """Initialize manager. 

1205 

1206 Args: 

1207 max_memory_mb: Maximum memory in MB 

1208 bytes_per_sample: Bytes per sample (default 8 for float64) 

1209 """ 

1210 self._max_memory = max_memory_mb * 1024 * 1024 

1211 self._bytes_per_sample = bytes_per_sample 

1212 

1213 def estimate_channel_memory( 

1214 self, 

1215 samples_per_channel: int, 

1216 num_channels: int, 

1217 ) -> int: 

1218 """Estimate memory for channels. 

1219 

1220 Args: 

1221 samples_per_channel: Samples per channel 

1222 num_channels: Number of channels 

1223 

1224 Returns: 

1225 Estimated memory in bytes 

1226 """ 

1227 return samples_per_channel * num_channels * self._bytes_per_sample 

1228 

1229 def can_load_all( 

1230 self, 

1231 samples_per_channel: int, 

1232 num_channels: int, 

1233 ) -> bool: 

1234 """Check if all channels can be loaded at once. 

1235 

1236 Args: 

1237 samples_per_channel: Samples per channel 

1238 num_channels: Number of channels 

1239 

1240 Returns: 

1241 True if all can be loaded 

1242 """ 

1243 required = self.estimate_channel_memory(samples_per_channel, num_channels) 

1244 return required < self._max_memory 

1245 

1246 def get_channel_batches( 

1247 self, 

1248 samples_per_channel: int, 

1249 channel_indices: list[int], 

1250 ) -> list[list[int]]: 

1251 """Get channel batches for sequential processing. 

1252 

1253 Args: 

1254 samples_per_channel: Samples per channel 

1255 channel_indices: Channel indices to process 

1256 

1257 Returns: 

1258 List of channel batches 

1259 """ 

1260 memory_per_channel = samples_per_channel * self._bytes_per_sample 

1261 

1262 # How many channels can we load at once? 

1263 channels_at_once = max(1, int(self._max_memory / memory_per_channel)) 

1264 

1265 # Create batches 

1266 batches = [] 

1267 for i in range(0, len(channel_indices), channels_at_once): 

1268 batches.append(channel_indices[i : i + channels_at_once]) 

1269 

1270 return batches 

1271 

1272 def suggest_subset( 

1273 self, 

1274 samples_per_channel: int, 

1275 total_channels: int, 

1276 ) -> dict[str, Any]: 

1277 """Suggest channel subset for memory-bounded analysis. 

1278 

1279 Args: 

1280 samples_per_channel: Samples per channel 

1281 total_channels: Total available channels 

1282 

1283 Returns: 

1284 Suggestion dict 

1285 """ 

1286 required = self.estimate_channel_memory(samples_per_channel, total_channels) 

1287 

1288 if required < self._max_memory: 1288 ↛ 1289line 1288 didn't jump to line 1289 because the condition on line 1288 was never true

1289 return { 

1290 "can_load_all": True, 

1291 "suggested_channels": list(range(total_channels)), 

1292 "memory_required_gb": required / 1e9, 

1293 } 

1294 

1295 # Calculate how many channels we can handle 

1296 memory_per_channel = samples_per_channel * self._bytes_per_sample 

1297 max_channels = max(1, int(self._max_memory / memory_per_channel)) 

1298 

1299 return { 

1300 "can_load_all": False, 

1301 "max_channels_at_once": max_channels, 

1302 "suggested_channels": list(range(min(max_channels, total_channels))), 

1303 "memory_required_gb": required / 1e9, 

1304 "memory_limit_gb": self._max_memory / 1e9, 

1305 "recommendation": ( 

1306 f"Process channels in batches of {max_channels}, " 

1307 f"or use --channels 0,1,2,... to select subset" 

1308 ), 

1309 } 

1310 

1311 def iterate_channels( 

1312 self, 

1313 samples_per_channel: int, 

1314 channel_indices: list[int], 

1315 ) -> Iterator[list[int]]: 

1316 """Iterate over channel batches. 

1317 

1318 Args: 

1319 samples_per_channel: Samples per channel 

1320 channel_indices: Channels to process 

1321 

1322 Yields: 

1323 Channel index batches 

1324 """ 

1325 batches = self.get_channel_batches(samples_per_channel, channel_indices) 

1326 yield from batches