Coverage for src / tracekit / utils / memory_extensions.py: 70%
144 statements
« prev ^ index » next coverage.py v7.13.1, created at 2026-01-11 23:04 +0000
« prev ^ index » next coverage.py v7.13.1, created at 2026-01-11 23:04 +0000
1"""Extended memory management utilities for TraceKit.
3Additional memory management features including context managers,
4LRU caching, and HDF5 lazy loading support.
7Example:
8 >>> from tracekit.utils.memory_extensions import ResourceManager, LRUCache
9 >>> with ResourceManager(large_array) as data:
10 ... result = process(data)
11 >>> # Data automatically cleaned up
13References:
14 Python resource management patterns
15 functools.lru_cache documentation
16"""
18from __future__ import annotations
20import gc
21import hashlib
22import os
23import time
24from collections import OrderedDict
25from typing import TYPE_CHECKING, Any, TypeVar
27import numpy as np
29if TYPE_CHECKING:
30 from collections.abc import Callable
32 from numpy.typing import NDArray
34T = TypeVar("T")
37# =============================================================================
38# Context Managers for Resource Cleanup (MEM-019)
39# =============================================================================
42class ResourceManager:
43 """Context manager for large data resources with automatic cleanup.
45 Ensures prompt memory release when done with large datasets.
47 Args:
48 resource: Resource to manage (array, file handle, etc.).
49 cleanup_func: Optional cleanup function to call on exit.
51 Example:
52 >>> import numpy as np
53 >>> with ResourceManager(np.zeros(1000000)) as data:
54 ... result = process(data)
55 >>> # Data is automatically released
57 References:
58 MEM-019: Explicit Resource Cleanup
59 """
61 def __init__(
62 self,
63 resource: Any,
64 cleanup_func: Callable[[Any], None] | None = None,
65 ) -> None:
66 self._resource = resource
67 self._cleanup_func = cleanup_func
69 def __enter__(self) -> Any:
70 """Enter context, return resource."""
71 return self._resource
73 def __exit__(self, exc_type: Any, exc_val: Any, exc_tb: Any) -> None:
74 """Exit context, cleanup resource."""
75 # Note: exc_val and exc_tb intentionally unused but required for Python 3.11+ compatibility
76 if self._cleanup_func is not None: 76 ↛ 80line 76 didn't jump to line 80 because the condition on line 76 was always true
77 self._cleanup_func(self._resource)
79 # Delete reference
80 self._resource = None
82 # Force garbage collection
83 gc.collect()
86class ArrayManager(ResourceManager):
87 """Context manager specifically for numpy arrays.
89 Example:
90 >>> import numpy as np
91 >>> with ArrayManager(np.zeros((10000, 10000))) as arr:
92 ... result = np.sum(arr)
93 >>> # Array memory is released
94 """
96 def __init__(self, array: NDArray[Any]) -> None:
97 super().__init__(array, cleanup_func=lambda x: None) # noqa: ARG005
100# =============================================================================
101# LRU Cache for Intermediate Results (MEM-021, MEM-029)
102# =============================================================================
105class LRUCache[T]:
106 """Least-Recently-Used cache with memory-based eviction.
108 Caches intermediate results with automatic eviction when
109 memory limit is exceeded.
111 Args:
112 max_memory_bytes: Maximum cache size in bytes.
113 max_entries: Maximum number of cache entries (default unlimited).
115 Example:
116 >>> cache = LRUCache(max_memory_bytes=1_000_000_000) # 1 GB
117 >>> cache.put("key1", large_array, size_bytes=800_000_000)
118 >>> result = cache.get("key1")
119 >>> cache.clear()
121 References:
122 MEM-021: Intermediate Result Eviction
123 MEM-029: LRU Cache for Intermediate Results
124 """
126 def __init__(
127 self,
128 max_memory_bytes: int,
129 max_entries: int | None = None,
130 ) -> None:
131 self._max_memory = max_memory_bytes
132 self._max_entries = max_entries
133 self._cache: OrderedDict[str, tuple[T, int, float]] = OrderedDict()
134 self._current_memory: int = 0
135 self._hits: int = 0
136 self._misses: int = 0
138 def get(self, key: str) -> T | None:
139 """Get cached value by key.
141 Args:
142 key: Cache key.
144 Returns:
145 Cached value or None if not found.
146 """
147 if key in self._cache:
148 # Move to end (most recently used)
149 value, size, _ = self._cache.pop(key)
150 self._cache[key] = (value, size, time.time())
151 self._hits += 1
152 return value
153 else:
154 self._misses += 1
155 return None
157 def put(self, key: str, value: T, size_bytes: int | None = None) -> None:
158 """Put value in cache.
160 Args:
161 key: Cache key.
162 value: Value to cache.
163 size_bytes: Size in bytes. If None, estimated from value.
164 """
165 # Estimate size if not provided
166 if size_bytes is None:
167 size_bytes = self._estimate_size(value)
169 # Check if single item exceeds max memory
170 if size_bytes > self._max_memory: 170 ↛ 172line 170 didn't jump to line 172 because the condition on line 170 was never true
171 # Don't cache items larger than max memory
172 return
174 # Evict until we have space
175 while (
176 self._current_memory + size_bytes > self._max_memory
177 or (self._max_entries and len(self._cache) >= self._max_entries)
178 ) and len(self._cache) > 0:
179 self._evict_oldest()
181 # Remove if key already exists
182 if key in self._cache: 182 ↛ 183line 182 didn't jump to line 183 because the condition on line 182 was never true
183 _, old_size, _ = self._cache.pop(key)
184 self._current_memory -= old_size
186 # Add new entry
187 self._cache[key] = (value, size_bytes, time.time())
188 self._current_memory += size_bytes
190 def _evict_oldest(self) -> None:
191 """Evict least recently used entry."""
192 if len(self._cache) > 0: 192 ↛ exitline 192 didn't return from function '_evict_oldest' because the condition on line 192 was always true
193 _, (_, size, _) = self._cache.popitem(last=False)
194 self._current_memory -= size
196 def clear(self) -> None:
197 """Clear entire cache."""
198 self._cache.clear()
199 self._current_memory = 0
201 def _estimate_size(self, value: T) -> int:
202 """Estimate size of cached value."""
203 if isinstance(value, np.ndarray): 203 ↛ 205line 203 didn't jump to line 205 because the condition on line 203 was always true
204 return int(value.nbytes)
205 elif isinstance(value, list | tuple):
206 # Rough estimate for sequences
207 return sum(self._estimate_size(item) for item in value)
208 elif isinstance(value, dict):
209 return sum(self._estimate_size(k) + self._estimate_size(v) for k, v in value.items())
210 else:
211 # Fallback: assume 1KB for unknown types
212 return 1024
214 def stats(self) -> dict[str, int | float]:
215 """Get cache statistics.
217 Returns:
218 Dictionary with cache stats.
219 """
220 total_requests = self._hits + self._misses
221 hit_rate = self._hits / total_requests if total_requests > 0 else 0.0
223 return {
224 "entries": len(self._cache),
225 "memory_bytes": self._current_memory,
226 "memory_mb": self._current_memory / (1024 * 1024),
227 "hits": self._hits,
228 "misses": self._misses,
229 "hit_rate": hit_rate,
230 }
232 def __len__(self) -> int:
233 """Number of cached entries."""
234 return len(self._cache)
237# Global result cache
238_result_cache: LRUCache[Any] | None = None
241def get_result_cache() -> LRUCache[Any]:
242 """Get global result cache.
244 Returns:
245 Global LRU cache instance.
247 Example:
248 >>> cache = get_result_cache()
249 >>> cache.put("fft_result", fft_data, size_bytes=8000000)
250 >>> result = cache.get("fft_result")
251 """
252 global _result_cache
253 if _result_cache is None:
254 # Default: 2 GB cache
255 max_cache_size = int(os.environ.get("TK_CACHE_SIZE", 2 * 1024 * 1024 * 1024)) # noqa: PLW1508
256 _result_cache = LRUCache(max_memory_bytes=max_cache_size)
257 return _result_cache
260def clear_cache() -> None:
261 """Clear the global result cache.
263 Example:
264 >>> clear_cache()
265 >>> # All cached results released
266 """
267 cache = get_result_cache()
268 cache.clear()
271def show_cache_stats() -> dict[str, int | float]:
272 """Show statistics for the global cache.
274 Returns:
275 Dictionary with cache statistics.
277 Example:
278 >>> stats = show_cache_stats()
279 >>> print(f"Hit rate: {stats['hit_rate']*100:.1f}%")
280 >>> print(f"Memory used: {stats['memory_mb']:.1f} MB")
281 """
282 cache = get_result_cache()
283 return cache.stats()
286def cache_key(*args: Any, **kwargs: Any) -> str:
287 """Generate cache key from arguments.
289 Args:
290 *args: Positional arguments.
291 **kwargs: Keyword arguments.
293 Returns:
294 Hash-based cache key.
296 Example:
297 >>> key = cache_key("fft", samples=1000, nfft=2048)
298 >>> # Use key for caching
299 """
300 # Create stable string representation
301 parts = [str(arg) for arg in args]
302 parts.extend(f"{k}={v}" for k, v in sorted(kwargs.items()))
303 key_str = "|".join(parts)
305 # Hash for consistent key
306 return hashlib.md5(key_str.encode()).hexdigest()
309# =============================================================================
310# HDF5 Lazy Loading (MEM-017)
311# =============================================================================
314def load_hdf5_lazy(
315 file_path: str,
316 dataset_path: str = "/data",
317) -> Any:
318 """Load HDF5 dataset as lazy h5py.Dataset (not fully in memory).
320 Enables partial loading via slicing without loading entire dataset.
322 Args:
323 file_path: Path to HDF5 file.
324 dataset_path: Path to dataset within file (default "/data").
326 Returns:
327 h5py.Dataset object (lazy, not loaded until accessed).
329 Raises:
330 ImportError: If h5py is not available.
331 FileNotFoundError: If file does not exist.
332 KeyError: If dataset not found in file.
334 Example:
335 >>> # Load dataset lazily
336 >>> dataset = load_hdf5_lazy("large_file.h5", "/signals/ch1")
337 >>> # Only load specific range (not entire file)
338 >>> chunk = dataset[1000:2000]
339 >>> print(f"Chunk shape: {chunk.shape}")
341 References:
342 MEM-017: HDF5 Chunked Dataset Access
343 """
344 try:
345 import h5py
346 except ImportError:
347 raise ImportError( # noqa: B904
348 "h5py required for lazy HDF5 loading. Install with: pip install h5py"
349 )
351 from pathlib import Path
353 path = Path(file_path)
354 if not path.exists():
355 raise FileNotFoundError(f"File not found: {file_path}")
357 # Open file in read mode
358 # Note: File handle should be kept open for lazy access
359 # User is responsible for closing the file handle
360 f = h5py.File(file_path, "r")
362 if dataset_path not in f:
363 available = list(f.keys())
364 f.close()
365 raise KeyError(
366 f"Dataset '{dataset_path}' not found in HDF5 file. "
367 f"Available datasets: {', '.join(available)}"
368 )
370 dataset = f[dataset_path]
372 return dataset
375class LazyHDF5Array:
376 """Wrapper for lazy HDF5 dataset access with context management.
378 Provides automatic file handle cleanup and numpy-like slicing.
380 Args:
381 file_path: Path to HDF5 file.
382 dataset_path: Path to dataset within file.
384 Example:
385 >>> with LazyHDF5Array("data.h5", "/signals/ch1") as arr:
386 ... # Only loads specific slice
387 ... chunk = arr[1000:2000]
388 ... print(f"Shape: {arr.shape}, dtype: {arr.dtype}")
389 >>> # File automatically closed
391 References:
392 MEM-017: HDF5 Chunked Dataset Access
393 MEM-019: Explicit Resource Cleanup
394 """
396 def __init__(self, file_path: str, dataset_path: str = "/data"):
397 self._file_path = file_path
398 self._dataset_path = dataset_path
399 self._file = None
400 self._dataset = None
402 def __enter__(self) -> LazyHDF5Array:
403 """Open HDF5 file and dataset."""
404 try:
405 import h5py
406 except ImportError:
407 raise ImportError("h5py required. Install with: pip install h5py") # noqa: B904
409 self._file = h5py.File(self._file_path, "r")
410 self._dataset = self._file[self._dataset_path] # type: ignore[index]
411 return self
413 def __exit__(self, exc_type: Any, exc_val: Any, exc_tb: Any) -> None:
414 """Close HDF5 file."""
415 # Note: exc_val and exc_tb intentionally unused but required for Python 3.11+ compatibility
416 if self._file is not None: 416 ↛ exitline 416 didn't return from function '__exit__' because the condition on line 416 was always true
417 self._file.close() # type: ignore[unreachable]
418 self._file = None
419 self._dataset = None
421 def __getitem__(self, key: Any) -> NDArray[Any]:
422 """Get item/slice from dataset (triggers partial load)."""
423 if self._dataset is None: 423 ↛ 424line 423 didn't jump to line 424 because the condition on line 423 was never true
424 raise RuntimeError("LazyHDF5Array must be used as context manager")
425 return np.asarray(self._dataset[key]) # type: ignore[unreachable]
427 @property
428 def shape(self) -> tuple[int, ...]:
429 """Dataset shape."""
430 if self._dataset is None: 430 ↛ 431line 430 didn't jump to line 431 because the condition on line 430 was never true
431 raise RuntimeError("LazyHDF5Array must be used as context manager")
432 return self._dataset.shape # type: ignore[unreachable]
434 @property
435 def dtype(self) -> np.dtype[Any]:
436 """Dataset dtype."""
437 if self._dataset is None: 437 ↛ 438line 437 didn't jump to line 438 because the condition on line 437 was never true
438 raise RuntimeError("LazyHDF5Array must be used as context manager")
439 return self._dataset.dtype # type: ignore[unreachable]
441 @property
442 def size(self) -> int:
443 """Total number of elements."""
444 if self._dataset is None:
445 raise RuntimeError("LazyHDF5Array must be used as context manager")
446 return self._dataset.size # type: ignore[unreachable]
448 def __len__(self) -> int:
449 """Length of first dimension."""
450 if self._dataset is None:
451 raise RuntimeError("LazyHDF5Array must be used as context manager")
452 return len(self._dataset) # type: ignore[unreachable]
455__all__ = [
456 "ArrayManager",
457 "LRUCache",
458 "LazyHDF5Array",
459 "ResourceManager",
460 "cache_key",
461 "clear_cache",
462 "get_result_cache",
463 "load_hdf5_lazy",
464 "show_cache_stats",
465]