Coverage for src / tracekit / utils / memory_advanced.py: 67%
478 statements
« prev ^ index » next coverage.py v7.13.1, created at 2026-01-11 23:04 +0000
« prev ^ index » next coverage.py v7.13.1, created at 2026-01-11 23:04 +0000
1"""Advanced memory management utilities.
3This module provides advanced memory management features including
4quality modes, cache management, garbage collection, and streaming
5backpressure support.
6"""
8from __future__ import annotations
10import contextlib
11import gc
12import hashlib
13import json
14import logging
15import pickle
16import tempfile
17import threading
18import time
19from collections import OrderedDict
20from dataclasses import dataclass
21from enum import Enum
22from pathlib import Path
23from typing import TYPE_CHECKING, Any, TypeVar
25import numpy as np
27if TYPE_CHECKING:
28 from collections.abc import Iterator
30logger = logging.getLogger(__name__)
32__all__ = [
33 "AdaptiveMeasurementSelector",
34 "BackpressureController",
35 "CacheEntry",
36 "CacheInvalidationStrategy",
37 "DiskCache",
38 "GCController",
39 "MemoryLogger",
40 "MultiChannelMemoryManager",
41 "QualityMode",
42 "QualityModeConfig",
43 "WSLSwapChecker",
44 "adaptive_measurements",
45 "gc_aggressive",
46 "get_wsl_memory_limits",
47]
50# =============================================================================
51# =============================================================================
54class QualityMode(Enum):
55 """Quality mode for memory-constrained scenarios.
57 References:
58 MEM-014: Quality vs Memory Trade-offs
59 """
61 PREVIEW = "preview" # Max speed, low memory, approximate
62 BALANCED = "balanced" # Default, moderate quality
63 HIGH_QUALITY = "high" # Accurate, may require more memory
66@dataclass
67class QualityModeConfig:
68 """Configuration for quality modes.
70 Attributes:
71 mode: Quality mode
72 downsample_factor: Downsampling factor for preview mode
73 nfft_factor: FFT size reduction factor
74 overlap_factor: Overlap reduction factor
75 enable_caching: Enable intermediate caching
76 use_approximations: Use faster approximations
78 References:
79 MEM-014: Quality vs Memory Trade-offs
80 """
82 mode: QualityMode = QualityMode.BALANCED
83 downsample_factor: int = 1
84 nfft_factor: float = 1.0
85 overlap_factor: float = 1.0
86 enable_caching: bool = True
87 use_approximations: bool = False
89 @classmethod
90 def for_mode(cls, mode: QualityMode | str) -> QualityModeConfig:
91 """Get configuration for specified quality mode.
93 Args:
94 mode: Quality mode
96 Returns:
97 Appropriate configuration
98 """
99 if isinstance(mode, str): 99 ↛ 100line 99 didn't jump to line 100 because the condition on line 99 was never true
100 mode = QualityMode(mode.lower())
102 if mode == QualityMode.PREVIEW:
103 return cls(
104 mode=mode,
105 downsample_factor=8,
106 nfft_factor=0.25,
107 overlap_factor=0.5,
108 enable_caching=False,
109 use_approximations=True,
110 )
111 elif mode == QualityMode.HIGH_QUALITY:
112 return cls(
113 mode=mode,
114 downsample_factor=1,
115 nfft_factor=2.0,
116 overlap_factor=1.5,
117 enable_caching=True,
118 use_approximations=False,
119 )
120 else: # BALANCED
121 return cls(
122 mode=mode,
123 downsample_factor=1,
124 nfft_factor=1.0,
125 overlap_factor=1.0,
126 enable_caching=True,
127 use_approximations=False,
128 )
131# Global quality mode
132_current_quality_mode = QualityMode.BALANCED
135def set_quality_mode(mode: QualityMode | str) -> None:
136 """Set global quality mode.
138 Args:
139 mode: Quality mode to use
140 """
141 global _current_quality_mode
142 if isinstance(mode, str): 142 ↛ 144line 142 didn't jump to line 144 because the condition on line 142 was always true
143 mode = QualityMode(mode.lower())
144 _current_quality_mode = mode
145 logger.info(f"Quality mode set to: {mode.value}")
148def get_quality_mode() -> QualityMode:
149 """Get current quality mode."""
150 return _current_quality_mode
153def get_quality_config() -> QualityModeConfig:
154 """Get configuration for current quality mode."""
155 return QualityModeConfig.for_mode(_current_quality_mode)
158# =============================================================================
159# =============================================================================
162class GCController:
163 """Garbage collection controller.
165 Controls when and how garbage collection occurs based on
166 memory pressure and operation completion.
168 References:
169 MEM-020: Garbage Collection Triggers
170 """
172 def __init__(self, aggressive: bool = False) -> None:
173 """Initialize GC controller.
175 Args:
176 aggressive: Enable aggressive GC mode
177 """
178 self._aggressive = aggressive
179 self._collection_count = 0
180 self._bytes_collected = 0
182 @property
183 def aggressive(self) -> bool:
184 """Check if aggressive mode enabled."""
185 return self._aggressive
187 @aggressive.setter
188 def aggressive(self, value: bool) -> None:
189 """Set aggressive mode."""
190 self._aggressive = value
192 def collect(self) -> int:
193 """Perform garbage collection.
195 Returns:
196 Number of objects collected
197 """
198 if self._aggressive: 198 ↛ 203line 198 didn't jump to line 203 because the condition on line 198 was always true
199 # Full collection with all generations
200 collected = gc.collect(generation=2)
201 else:
202 # Standard collection
203 collected = gc.collect()
205 self._collection_count += 1
206 self._bytes_collected += collected
207 logger.debug(f"GC collected {collected} objects")
208 return collected
210 def collect_after_operation(self) -> None:
211 """Collect garbage after a large operation."""
212 if self._aggressive:
213 # Force immediate collection
214 gc.collect()
215 gc.collect() # Second pass for circular references
217 def get_stats(self) -> dict[str, Any]:
218 """Get GC statistics.
220 Returns:
221 Dict with GC statistics
222 """
223 return {
224 "collection_count": self._collection_count,
225 "bytes_collected": self._bytes_collected,
226 "aggressive_mode": self._aggressive,
227 "gc_threshold": gc.get_threshold(),
228 "gc_count": gc.get_count(),
229 }
232# Global GC controller
233_gc_controller = GCController()
236def gc_aggressive(enable: bool = True) -> None:
237 """Enable/disable aggressive garbage collection.
239 Args:
240 enable: Whether to enable aggressive GC
241 """
242 _gc_controller.aggressive = enable
245def force_gc() -> int:
246 """Force garbage collection.
248 Returns:
249 Number of objects collected
250 """
251 return _gc_controller.collect()
254# =============================================================================
255# =============================================================================
258class WSLSwapChecker:
259 """WSL swap availability checker.
261 Detects WSL environment and applies conservative memory
262 estimates accounting for limited swap.
264 References:
265 MEM-023: WSL Swap Awareness
266 """
268 def __init__(self) -> None:
269 """Initialize WSL checker."""
270 self._is_wsl = self._detect_wsl()
271 self._wslconfig_parsed = False
272 self._wslconfig_memory: int | None = None
273 self._wslconfig_swap: int | None = None
275 def _detect_wsl(self) -> bool:
276 """Detect if running in WSL."""
277 try:
278 with open("/proc/version") as f:
279 version = f.read().lower()
280 return "microsoft" in version or "wsl" in version
281 except FileNotFoundError:
282 return False
284 @property
285 def is_wsl(self) -> bool:
286 """Check if running in WSL."""
287 return self._is_wsl
289 def get_wsl_memory_limit(self) -> int | None:
290 """Get WSL memory limit from .wslconfig if available.
292 Returns:
293 Memory limit in bytes, or None if not configured
294 """
295 if not self._is_wsl: 295 ↛ 298line 295 didn't jump to line 298 because the condition on line 295 was always true
296 return None
298 if self._wslconfig_parsed:
299 return self._wslconfig_memory
301 # Try to parse .wslconfig
302 wslconfig_path = Path.home() / ".wslconfig"
303 if wslconfig_path.exists():
304 try:
305 content = wslconfig_path.read_text()
306 for line in content.split("\n"):
307 if line.strip().lower().startswith("memory="):
308 value = line.split("=")[1].strip()
309 self._wslconfig_memory = self._parse_size(value)
310 elif line.strip().lower().startswith("swap="):
311 value = line.split("=")[1].strip()
312 self._wslconfig_swap = self._parse_size(value)
313 except Exception as e:
314 logger.warning(f"Failed to parse .wslconfig: {e}")
316 self._wslconfig_parsed = True
317 return self._wslconfig_memory
319 def get_wsl_swap_limit(self) -> int | None:
320 """Get WSL swap limit.
322 Returns:
323 Swap limit in bytes, or None if not configured
324 """
325 if not self._is_wsl: 325 ↛ 328line 325 didn't jump to line 328 because the condition on line 325 was always true
326 return None
328 if not self._wslconfig_parsed:
329 self.get_wsl_memory_limit()
331 return self._wslconfig_swap
333 def _parse_size(self, size_str: str) -> int:
334 """Parse size string like '8GB' to bytes."""
335 size_str = size_str.strip().upper()
336 multipliers = {
337 "K": 1024,
338 "KB": 1024,
339 "M": 1024**2,
340 "MB": 1024**2,
341 "G": 1024**3,
342 "GB": 1024**3,
343 "T": 1024**4,
344 "TB": 1024**4,
345 }
347 for suffix, mult in multipliers.items():
348 if size_str.endswith(suffix):
349 num = float(size_str[: -len(suffix)])
350 return int(num * mult)
352 return int(size_str)
354 def get_safe_memory(self) -> int:
355 """Get safe memory limit for WSL.
357 Returns:
358 Recommended maximum memory to use
359 """
360 if not self._is_wsl: 360 ↛ 366line 360 didn't jump to line 366 because the condition on line 360 was always true
361 import psutil
363 return int(psutil.virtual_memory().available * 0.8)
365 # WSL has minimal swap by default - use physical RAM only
366 import psutil
368 total = psutil.virtual_memory().total
370 # Check .wslconfig limit
371 wsl_limit = self.get_wsl_memory_limit()
372 if wsl_limit is not None:
373 total = min(total, wsl_limit)
375 # Use 50% of available for safety (WSL can be unpredictable)
376 available = psutil.virtual_memory().available
377 return min(int(available * 0.5), int(total * 0.5))
380def get_wsl_memory_limits() -> dict[str, int | None]:
381 """Get WSL memory limits.
383 Returns:
384 Dict with memory and swap limits
385 """
386 checker = WSLSwapChecker()
387 return {
388 "is_wsl": checker.is_wsl,
389 "memory_limit": checker.get_wsl_memory_limit(),
390 "swap_limit": checker.get_wsl_swap_limit(),
391 "safe_memory": checker.get_safe_memory(),
392 }
395# =============================================================================
396# =============================================================================
399@dataclass
400class MemoryLogEntry:
401 """Single memory log entry.
403 Attributes:
404 timestamp: Entry timestamp
405 operation: Operation name
406 memory_used: Memory used in bytes
407 memory_peak: Peak memory
408 duration: Operation duration in seconds
409 """
411 timestamp: float
412 operation: str
413 memory_used: int
414 memory_peak: int
415 duration: float
418class MemoryLogger:
419 """Memory usage logger for debugging.
421 Logs memory usage at each operation for analysis.
423 References:
424 MEM-025: Memory Usage Logging
425 """
427 def __init__(
428 self,
429 log_file: str | Path | None = None,
430 format: str = "csv",
431 ) -> None:
432 """Initialize memory logger.
434 Args:
435 log_file: Output file path
436 format: Output format ('csv' or 'json')
437 """
438 self._log_file = Path(log_file) if log_file else None
439 self._format = format
440 self._entries: list[MemoryLogEntry] = []
441 self._enabled = False
442 self._peak_memory = 0
443 self._start_memory = 0
444 self._lock = threading.Lock()
446 def enable(self) -> None:
447 """Enable memory logging."""
448 self._enabled = True
449 import psutil
451 process = psutil.Process()
452 self._start_memory = process.memory_info().rss
453 self._peak_memory = self._start_memory
454 logger.info("Memory logging enabled")
456 def disable(self) -> None:
457 """Disable memory logging."""
458 self._enabled = False
459 self.flush()
461 def log_operation(
462 self,
463 operation: str,
464 duration: float = 0.0,
465 ) -> None:
466 """Log memory for an operation.
468 Args:
469 operation: Operation name
470 duration: Operation duration
471 """
472 if not self._enabled: 472 ↛ 473line 472 didn't jump to line 473 because the condition on line 472 was never true
473 return
475 import psutil
477 process = psutil.Process()
478 memory_used = process.memory_info().rss
479 self._peak_memory = max(self._peak_memory, memory_used)
481 entry = MemoryLogEntry(
482 timestamp=time.time(),
483 operation=operation,
484 memory_used=memory_used,
485 memory_peak=self._peak_memory,
486 duration=duration,
487 )
489 with self._lock:
490 self._entries.append(entry)
492 def flush(self) -> None:
493 """Write log to file."""
494 if self._log_file is None or not self._entries: 494 ↛ 495line 494 didn't jump to line 495 because the condition on line 494 was never true
495 return
497 with self._lock:
498 entries = self._entries.copy()
499 self._entries.clear()
501 if self._format == "csv": 501 ↛ 504line 501 didn't jump to line 504 because the condition on line 501 was always true
502 self._write_csv(entries)
503 else:
504 self._write_json(entries)
506 def _write_csv(self, entries: list[MemoryLogEntry]) -> None:
507 """Write entries as CSV."""
508 import csv
510 assert self._log_file is not None
511 mode = "a" if self._log_file.exists() else "w"
512 with open(self._log_file, mode, newline="") as f:
513 writer = csv.writer(f)
514 if mode == "w": 514 ↛ 515line 514 didn't jump to line 515 because the condition on line 514 was never true
515 writer.writerow(
516 ["timestamp", "operation", "memory_used", "memory_peak", "duration"]
517 )
518 for entry in entries:
519 writer.writerow(
520 [
521 entry.timestamp,
522 entry.operation,
523 entry.memory_used,
524 entry.memory_peak,
525 entry.duration,
526 ]
527 )
529 def _write_json(self, entries: list[MemoryLogEntry]) -> None:
530 """Write entries as JSON."""
531 assert self._log_file is not None
532 data = [
533 {
534 "timestamp": e.timestamp,
535 "operation": e.operation,
536 "memory_used": e.memory_used,
537 "memory_peak": e.memory_peak,
538 "duration": e.duration,
539 }
540 for e in entries
541 ]
543 with open(self._log_file, "a") as f:
544 f.writelines(json.dumps(entry) + "\n" for entry in data)
546 def get_summary(self) -> dict[str, Any]:
547 """Get logging summary.
549 Returns:
550 Summary statistics
551 """
552 return {
553 "entries_logged": len(self._entries),
554 "peak_memory_bytes": self._peak_memory,
555 "start_memory_bytes": self._start_memory,
556 "memory_growth_bytes": self._peak_memory - self._start_memory,
557 "enabled": self._enabled,
558 }
561# =============================================================================
562# =============================================================================
565class AdaptiveMeasurementSelector:
566 """Adaptive measurement selection for large files.
568 Disables memory-intensive measurements for very large files
569 and suggests alternatives.
571 References:
572 MEM-028: Adaptive Measurement Selection
573 """
575 # Default size thresholds (in samples)
576 THRESHOLDS = { # noqa: RUF012
577 "eye_diagram": 1e8, # 100M samples
578 "spectrogram": 5e8, # 500M samples
579 "full_correlation": 1e9, # 1B samples
580 "wavelet": 2e8, # 200M samples
581 }
583 def __init__(
584 self,
585 file_size_samples: int,
586 enable_all: bool = False,
587 ) -> None:
588 """Initialize selector.
590 Args:
591 file_size_samples: Number of samples in file
592 enable_all: Override to enable all measurements
593 """
594 self._size = file_size_samples
595 self._enable_all = enable_all
597 def is_enabled(self, measurement: str) -> bool:
598 """Check if measurement is enabled for current file size.
600 Args:
601 measurement: Measurement name
603 Returns:
604 True if measurement should be enabled
605 """
606 if self._enable_all:
607 return True
609 threshold = self.THRESHOLDS.get(measurement, float("inf"))
610 return self._size < threshold
612 def get_recommendations(self) -> dict[str, str]:
613 """Get recommendations for disabled measurements.
615 Returns:
616 Dict mapping disabled measurement to recommendation
617 """
618 recommendations = {}
620 for measurement, threshold in self.THRESHOLDS.items():
621 if self._size >= threshold: 621 ↛ 620line 621 didn't jump to line 620 because the condition on line 621 was always true
622 size_gb = self._size * 8 / 1e9 # Assume float64
623 if measurement == "eye_diagram":
624 recommendations[measurement] = (
625 f"File size ({size_gb:.1f} GB) exceeds threshold. "
626 f"Use --roi START:END to specify time range, or "
627 f"--enable-all to force processing."
628 )
629 elif measurement == "spectrogram":
630 recommendations[measurement] = (
631 "File too large for full spectrogram. "
632 "Use chunked_spectrogram() or downsample data."
633 )
634 elif measurement == "full_correlation":
635 recommendations[measurement] = (
636 f"Correlation on {size_gb:.1f} GB requires chunked approach. "
637 f"Use correlate_chunked() instead."
638 )
640 return recommendations
643def adaptive_measurements(
644 samples: int,
645 enable_all: bool = False,
646) -> AdaptiveMeasurementSelector:
647 """Create adaptive measurement selector.
649 Args:
650 samples: Number of samples
651 enable_all: Override to enable all
653 Returns:
654 Selector instance
655 """
656 return AdaptiveMeasurementSelector(samples, enable_all)
659# =============================================================================
660# =============================================================================
663T = TypeVar("T")
666@dataclass
667class CacheEntry[T]:
668 """Cache entry with metadata.
670 Attributes:
671 key: Cache key
672 value: Cached value
673 created_at: Creation timestamp
674 accessed_at: Last access timestamp
675 source_hash: Hash of source data
676 params_hash: Hash of parameters
677 ttl_seconds: Time-to-live in seconds
679 References:
680 MEM-030: Cache Invalidation Strategy
681 """
683 key: str
684 value: T
685 created_at: float
686 accessed_at: float
687 source_hash: str
688 params_hash: str
689 ttl_seconds: float = 3600.0 # 1 hour default
691 @property
692 def is_expired(self) -> bool:
693 """Check if entry has expired."""
694 if self.ttl_seconds <= 0: 694 ↛ 695line 694 didn't jump to line 695 because the condition on line 694 was never true
695 return False
696 return time.time() - self.created_at > self.ttl_seconds
698 @property
699 def age_seconds(self) -> float:
700 """Get entry age in seconds."""
701 return time.time() - self.created_at
704class CacheInvalidationStrategy:
705 """Cache invalidation strategy manager.
707 Manages cache invalidation based on data changes,
708 parameter changes, and time-to-live.
710 References:
711 MEM-030: Cache Invalidation Strategy
712 """
714 def __init__(
715 self,
716 max_size: int = 1000,
717 default_ttl: float = 3600.0,
718 ) -> None:
719 """Initialize cache.
721 Args:
722 max_size: Maximum entries
723 default_ttl: Default TTL in seconds
724 """
725 self._cache: OrderedDict[str, CacheEntry[Any]] = OrderedDict()
726 self._max_size = max_size
727 self._default_ttl = default_ttl
728 self._lock = threading.Lock()
729 self._hits = 0
730 self._misses = 0
732 def _compute_hash(self, data: Any) -> str:
733 """Compute hash of data for comparison."""
734 if isinstance(data, np.ndarray): 734 ↛ 736line 734 didn't jump to line 736 because the condition on line 734 was always true
735 return hashlib.md5(data.tobytes()[:1024]).hexdigest()
736 elif isinstance(data, dict | list):
737 return hashlib.md5(json.dumps(data, sort_keys=True).encode()).hexdigest()
738 else:
739 return hashlib.md5(str(data).encode()).hexdigest()
741 def get(
742 self,
743 key: str,
744 source_data: Any = None,
745 params: dict[str, Any] | None = None,
746 ) -> tuple[Any, bool]:
747 """Get value from cache.
749 Args:
750 key: Cache key
751 source_data: Source data to validate against
752 params: Parameters to validate against
754 Returns:
755 (value, hit) tuple
756 """
757 with self._lock:
758 if key not in self._cache:
759 self._misses += 1
760 return None, False
762 entry = self._cache[key]
764 # Check expiration
765 if entry.is_expired: 765 ↛ 766line 765 didn't jump to line 766 because the condition on line 765 was never true
766 del self._cache[key]
767 self._misses += 1
768 return None, False
770 # Check source data change
771 if source_data is not None:
772 source_hash = self._compute_hash(source_data)
773 if source_hash != entry.source_hash:
774 del self._cache[key]
775 self._misses += 1
776 return None, False
778 # Check parameter change
779 if params is not None: 779 ↛ 780line 779 didn't jump to line 780 because the condition on line 779 was never true
780 params_hash = self._compute_hash(params)
781 if params_hash != entry.params_hash:
782 del self._cache[key]
783 self._misses += 1
784 return None, False
786 # Update access time and move to end
787 entry.accessed_at = time.time()
788 self._cache.move_to_end(key)
789 self._hits += 1
790 return entry.value, True
792 def set(
793 self,
794 key: str,
795 value: Any,
796 source_data: Any = None,
797 params: dict[str, Any] | None = None,
798 ttl: float | None = None,
799 ) -> None:
800 """Set cache value.
802 Args:
803 key: Cache key
804 value: Value to cache
805 source_data: Source data for invalidation
806 params: Parameters for invalidation
807 ttl: Time-to-live (uses default if None)
808 """
809 with self._lock:
810 # Evict if at capacity
811 while len(self._cache) >= self._max_size: 811 ↛ 812line 811 didn't jump to line 812 because the condition on line 811 was never true
812 self._cache.popitem(last=False)
814 entry = CacheEntry(
815 key=key,
816 value=value,
817 created_at=time.time(),
818 accessed_at=time.time(),
819 source_hash=self._compute_hash(source_data) if source_data is not None else "",
820 params_hash=self._compute_hash(params) if params is not None else "",
821 ttl_seconds=ttl if ttl is not None else self._default_ttl,
822 )
823 self._cache[key] = entry
825 def invalidate(self, key: str) -> bool:
826 """Invalidate specific cache entry.
828 Args:
829 key: Key to invalidate
831 Returns:
832 True if entry existed
833 """
834 with self._lock:
835 if key in self._cache: 835 ↛ 838line 835 didn't jump to line 838 because the condition on line 835 was always true
836 del self._cache[key]
837 return True
838 return False
840 def invalidate_by_source(self, source_data: Any) -> int:
841 """Invalidate all entries with matching source.
843 Args:
844 source_data: Source data to match
846 Returns:
847 Number of entries invalidated
848 """
849 source_hash = self._compute_hash(source_data)
850 count = 0
851 with self._lock:
852 keys_to_remove = [k for k, v in self._cache.items() if v.source_hash == source_hash]
853 for key in keys_to_remove:
854 del self._cache[key]
855 count += 1
856 return count
858 def clear(self) -> int:
859 """Clear all cache entries.
861 Returns:
862 Number of entries cleared
863 """
864 with self._lock:
865 count = len(self._cache)
866 self._cache.clear()
867 return count
869 def cleanup_expired(self) -> int:
870 """Remove expired entries.
872 Returns:
873 Number of entries removed
874 """
875 count = 0
876 with self._lock:
877 keys_to_remove = [k for k, v in self._cache.items() if v.is_expired]
878 for key in keys_to_remove:
879 del self._cache[key]
880 count += 1
881 return count
883 def get_stats(self) -> dict[str, Any]:
884 """Get cache statistics.
886 Returns:
887 Cache statistics
888 """
889 with self._lock:
890 return {
891 "size": len(self._cache),
892 "max_size": self._max_size,
893 "hits": self._hits,
894 "misses": self._misses,
895 "hit_rate": self._hits / (self._hits + self._misses)
896 if (self._hits + self._misses) > 0
897 else 0,
898 "default_ttl": self._default_ttl,
899 }
902# =============================================================================
903# =============================================================================
906class DiskCache:
907 """Disk-based cache for large intermediates.
909 Spills cache to disk when memory limit exceeded.
911 References:
912 MEM-031: Persistent Cache (Disk-Based)
913 """
915 def __init__(
916 self,
917 cache_dir: str | Path | None = None,
918 max_memory_mb: int = 1024,
919 max_disk_mb: int = 10240,
920 ttl_hours: float = 1.0,
921 ) -> None:
922 """Initialize disk cache.
924 Args:
925 cache_dir: Cache directory (default: temp dir)
926 max_memory_mb: Max in-memory cache size in MB
927 max_disk_mb: Max on-disk cache size in MB
928 ttl_hours: Time-to-live in hours
929 """
930 self._cache_dir = (
931 Path(cache_dir) if cache_dir else Path(tempfile.gettempdir()) / "tracekit_cache"
932 )
933 self._cache_dir.mkdir(parents=True, exist_ok=True)
934 self._max_memory = max_memory_mb * 1024 * 1024
935 self._max_disk = max_disk_mb * 1024 * 1024
936 self._ttl_seconds = ttl_hours * 3600
937 self._memory_cache: OrderedDict[str, tuple[Any, int]] = OrderedDict()
938 self._memory_used = 0
939 self._lock = threading.Lock()
941 def _get_cache_path(self, key: str) -> Path:
942 """Get cache file path for key."""
943 key_hash = hashlib.sha256(key.encode()).hexdigest()[:16]
944 return self._cache_dir / f"{key_hash}.cache"
946 def _estimate_size(self, value: Any) -> int:
947 """Estimate memory size of value."""
948 if isinstance(value, np.ndarray):
949 return value.nbytes # type: ignore[no-any-return]
950 else:
951 return len(pickle.dumps(value))
953 def get(self, key: str) -> tuple[Any, bool]:
954 """Get value from cache.
956 Args:
957 key: Cache key
959 Returns:
960 (value, hit) tuple
961 """
962 # Check memory cache first
963 with self._lock:
964 if key in self._memory_cache: 964 ↛ 970line 964 didn't jump to line 970
965 value, size = self._memory_cache[key]
966 self._memory_cache.move_to_end(key)
967 return value, True
969 # Check disk cache
970 cache_path = self._get_cache_path(key)
971 if cache_path.exists():
972 # Check TTL
973 if time.time() - cache_path.stat().st_mtime > self._ttl_seconds:
974 cache_path.unlink()
975 return None, False
977 try:
978 with open(cache_path, "rb") as f:
979 value = pickle.load(f)
981 # Promote to memory cache if space
982 size = self._estimate_size(value)
983 if size < self._max_memory:
984 self._add_to_memory(key, value, size)
986 return value, True
987 except Exception as e:
988 logger.warning(f"Failed to load cache: {e}")
989 return None, False
991 return None, False
993 def _add_to_memory(self, key: str, value: Any, size: int) -> None:
994 """Add to memory cache, evicting if needed."""
995 with self._lock:
996 # Evict until we have space
997 while self._memory_used + size > self._max_memory and self._memory_cache: 997 ↛ 998line 997 didn't jump to line 998 because the condition on line 997 was never true
998 evict_key, (evict_value, evict_size) = self._memory_cache.popitem(last=False)
999 self._memory_used -= evict_size
1000 # Spill to disk
1001 self._write_to_disk(evict_key, evict_value)
1003 self._memory_cache[key] = (value, size)
1004 self._memory_used += size
1006 def _write_to_disk(self, key: str, value: Any) -> None:
1007 """Write value to disk cache."""
1008 # Check disk space
1009 self._cleanup_disk()
1011 cache_path = self._get_cache_path(key)
1012 try:
1013 with open(cache_path, "wb") as f:
1014 pickle.dump(value, f)
1015 except Exception as e:
1016 logger.warning(f"Failed to write cache: {e}")
1018 def _cleanup_disk(self) -> None:
1019 """Clean up disk cache if over limit."""
1020 try:
1021 total_size = sum(f.stat().st_size for f in self._cache_dir.glob("*.cache"))
1022 if total_size > self._max_disk:
1023 # Remove oldest files
1024 files = sorted(self._cache_dir.glob("*.cache"), key=lambda f: f.stat().st_mtime)
1025 for f in files:
1026 if total_size <= self._max_disk * 0.8:
1027 break
1028 total_size -= f.stat().st_size
1029 f.unlink()
1030 except Exception as e:
1031 logger.warning(f"Disk cleanup error: {e}")
1033 def set(self, key: str, value: Any) -> None:
1034 """Set cache value.
1036 Args:
1037 key: Cache key
1038 value: Value to cache
1039 """
1040 size = self._estimate_size(value)
1041 if size < self._max_memory: 1041 ↛ 1045line 1041 didn't jump to line 1045 because the condition on line 1041 was always true
1042 self._add_to_memory(key, value, size)
1043 else:
1044 # Too large for memory, write directly to disk
1045 self._write_to_disk(key, value)
1047 def clear(self) -> None:
1048 """Clear all caches."""
1049 with self._lock:
1050 self._memory_cache.clear()
1051 self._memory_used = 0
1053 for f in self._cache_dir.glob("*.cache"): 1053 ↛ 1054line 1053 didn't jump to line 1054 because the loop on line 1053 never started
1054 with contextlib.suppress(Exception):
1055 f.unlink()
1058# =============================================================================
1059# =============================================================================
1062class BackpressureController:
1063 """Backpressure controller for streaming.
1065 Manages slow consumer handling for real-time streaming.
1067 References:
1068 MEM-032: Backpressure for Streaming
1069 """
1071 def __init__(
1072 self,
1073 buffer_size: int = 10000,
1074 drop_oldest: bool = True,
1075 warn_threshold: float = 0.8,
1076 ) -> None:
1077 """Initialize backpressure controller.
1079 Args:
1080 buffer_size: Maximum buffer size
1081 drop_oldest: Drop oldest on overflow (vs pause)
1082 warn_threshold: Warning threshold (0-1)
1083 """
1084 self._buffer: list[Any] = []
1085 self._buffer_size = buffer_size
1086 self._drop_oldest = drop_oldest
1087 self._warn_threshold = warn_threshold
1088 self._dropped_count = 0
1089 self._paused = False
1090 self._lock = threading.Lock()
1092 @property
1093 def is_paused(self) -> bool:
1094 """Check if acquisition should be paused."""
1095 return self._paused
1097 @property
1098 def buffer_usage(self) -> float:
1099 """Get buffer usage ratio (0-1)."""
1100 return len(self._buffer) / self._buffer_size
1102 @property
1103 def dropped_count(self) -> int:
1104 """Get number of dropped samples."""
1105 return self._dropped_count
1107 def push(self, data: Any) -> bool:
1108 """Push data to buffer.
1110 Args:
1111 data: Data to buffer
1113 Returns:
1114 True if accepted, False if dropped
1115 """
1116 with self._lock:
1117 if len(self._buffer) >= self._buffer_size:
1118 if self._drop_oldest: 1118 ↛ 1125line 1118 didn't jump to line 1125 because the condition on line 1118 was always true
1119 self._buffer.pop(0)
1120 self._dropped_count += 1
1121 logger.warning(
1122 f"Buffer overflow: dropped frame ({self._dropped_count} total dropped)"
1123 )
1124 else:
1125 self._paused = True
1126 return False
1128 self._buffer.append(data)
1130 # Check warning threshold
1131 if self.buffer_usage > self._warn_threshold:
1132 logger.warning(
1133 f"Buffer usage at {self.buffer_usage * 100:.0f}% - "
1134 f"analysis slower than acquisition"
1135 )
1137 return True
1139 def pop(self) -> Any | None:
1140 """Pop data from buffer.
1142 Returns:
1143 Data or None if empty
1144 """
1145 with self._lock:
1146 if not self._buffer: 1146 ↛ 1147line 1146 didn't jump to line 1147 because the condition on line 1146 was never true
1147 return None
1149 data = self._buffer.pop(0)
1150 self._paused = False
1151 return data
1153 def pop_all(self) -> list[Any]:
1154 """Pop all data from buffer.
1156 Returns:
1157 List of all buffered data
1158 """
1159 with self._lock:
1160 data = self._buffer.copy()
1161 self._buffer.clear()
1162 self._paused = False
1163 return data
1165 def signal_backpressure(self) -> None:
1166 """Signal backpressure to source."""
1167 with self._lock:
1168 self._paused = True
1170 def get_stats(self) -> dict[str, Any]:
1171 """Get backpressure statistics.
1173 Returns:
1174 Statistics dict
1175 """
1176 return {
1177 "buffer_size": len(self._buffer),
1178 "buffer_capacity": self._buffer_size,
1179 "usage_ratio": self.buffer_usage,
1180 "dropped_count": self._dropped_count,
1181 "is_paused": self._paused,
1182 }
1185# =============================================================================
1186# =============================================================================
1189class MultiChannelMemoryManager:
1190 """Multi-channel memory management.
1192 Manages memory for multi-channel processing, enabling
1193 sequential or subset processing for bounded memory.
1195 References:
1196 MEM-033: Multi-Channel Memory Management
1197 """
1199 def __init__(
1200 self,
1201 max_memory_mb: int = 4096,
1202 bytes_per_sample: int = 8,
1203 ) -> None:
1204 """Initialize manager.
1206 Args:
1207 max_memory_mb: Maximum memory in MB
1208 bytes_per_sample: Bytes per sample (default 8 for float64)
1209 """
1210 self._max_memory = max_memory_mb * 1024 * 1024
1211 self._bytes_per_sample = bytes_per_sample
1213 def estimate_channel_memory(
1214 self,
1215 samples_per_channel: int,
1216 num_channels: int,
1217 ) -> int:
1218 """Estimate memory for channels.
1220 Args:
1221 samples_per_channel: Samples per channel
1222 num_channels: Number of channels
1224 Returns:
1225 Estimated memory in bytes
1226 """
1227 return samples_per_channel * num_channels * self._bytes_per_sample
1229 def can_load_all(
1230 self,
1231 samples_per_channel: int,
1232 num_channels: int,
1233 ) -> bool:
1234 """Check if all channels can be loaded at once.
1236 Args:
1237 samples_per_channel: Samples per channel
1238 num_channels: Number of channels
1240 Returns:
1241 True if all can be loaded
1242 """
1243 required = self.estimate_channel_memory(samples_per_channel, num_channels)
1244 return required < self._max_memory
1246 def get_channel_batches(
1247 self,
1248 samples_per_channel: int,
1249 channel_indices: list[int],
1250 ) -> list[list[int]]:
1251 """Get channel batches for sequential processing.
1253 Args:
1254 samples_per_channel: Samples per channel
1255 channel_indices: Channel indices to process
1257 Returns:
1258 List of channel batches
1259 """
1260 memory_per_channel = samples_per_channel * self._bytes_per_sample
1262 # How many channels can we load at once?
1263 channels_at_once = max(1, int(self._max_memory / memory_per_channel))
1265 # Create batches
1266 batches = []
1267 for i in range(0, len(channel_indices), channels_at_once):
1268 batches.append(channel_indices[i : i + channels_at_once])
1270 return batches
1272 def suggest_subset(
1273 self,
1274 samples_per_channel: int,
1275 total_channels: int,
1276 ) -> dict[str, Any]:
1277 """Suggest channel subset for memory-bounded analysis.
1279 Args:
1280 samples_per_channel: Samples per channel
1281 total_channels: Total available channels
1283 Returns:
1284 Suggestion dict
1285 """
1286 required = self.estimate_channel_memory(samples_per_channel, total_channels)
1288 if required < self._max_memory: 1288 ↛ 1289line 1288 didn't jump to line 1289 because the condition on line 1288 was never true
1289 return {
1290 "can_load_all": True,
1291 "suggested_channels": list(range(total_channels)),
1292 "memory_required_gb": required / 1e9,
1293 }
1295 # Calculate how many channels we can handle
1296 memory_per_channel = samples_per_channel * self._bytes_per_sample
1297 max_channels = max(1, int(self._max_memory / memory_per_channel))
1299 return {
1300 "can_load_all": False,
1301 "max_channels_at_once": max_channels,
1302 "suggested_channels": list(range(min(max_channels, total_channels))),
1303 "memory_required_gb": required / 1e9,
1304 "memory_limit_gb": self._max_memory / 1e9,
1305 "recommendation": (
1306 f"Process channels in batches of {max_channels}, "
1307 f"or use --channels 0,1,2,... to select subset"
1308 ),
1309 }
1311 def iterate_channels(
1312 self,
1313 samples_per_channel: int,
1314 channel_indices: list[int],
1315 ) -> Iterator[list[int]]:
1316 """Iterate over channel batches.
1318 Args:
1319 samples_per_channel: Samples per channel
1320 channel_indices: Channels to process
1322 Yields:
1323 Channel index batches
1324 """
1325 batches = self.get_channel_batches(samples_per_channel, channel_indices)
1326 yield from batches