Coverage for src / tracekit / utils / memory_extensions.py: 70%

144 statements  

« prev     ^ index     » next       coverage.py v7.13.1, created at 2026-01-11 23:04 +0000

1"""Extended memory management utilities for TraceKit. 

2 

3Additional memory management features including context managers, 

4LRU caching, and HDF5 lazy loading support. 

5 

6 

7Example: 

8 >>> from tracekit.utils.memory_extensions import ResourceManager, LRUCache 

9 >>> with ResourceManager(large_array) as data: 

10 ... result = process(data) 

11 >>> # Data automatically cleaned up 

12 

13References: 

14 Python resource management patterns 

15 functools.lru_cache documentation 

16""" 

17 

18from __future__ import annotations 

19 

20import gc 

21import hashlib 

22import os 

23import time 

24from collections import OrderedDict 

25from typing import TYPE_CHECKING, Any, TypeVar 

26 

27import numpy as np 

28 

29if TYPE_CHECKING: 

30 from collections.abc import Callable 

31 

32 from numpy.typing import NDArray 

33 

34T = TypeVar("T") 

35 

36 

37# ============================================================================= 

38# Context Managers for Resource Cleanup (MEM-019) 

39# ============================================================================= 

40 

41 

42class ResourceManager: 

43 """Context manager for large data resources with automatic cleanup. 

44 

45 Ensures prompt memory release when done with large datasets. 

46 

47 Args: 

48 resource: Resource to manage (array, file handle, etc.). 

49 cleanup_func: Optional cleanup function to call on exit. 

50 

51 Example: 

52 >>> import numpy as np 

53 >>> with ResourceManager(np.zeros(1000000)) as data: 

54 ... result = process(data) 

55 >>> # Data is automatically released 

56 

57 References: 

58 MEM-019: Explicit Resource Cleanup 

59 """ 

60 

61 def __init__( 

62 self, 

63 resource: Any, 

64 cleanup_func: Callable[[Any], None] | None = None, 

65 ) -> None: 

66 self._resource = resource 

67 self._cleanup_func = cleanup_func 

68 

69 def __enter__(self) -> Any: 

70 """Enter context, return resource.""" 

71 return self._resource 

72 

73 def __exit__(self, exc_type: Any, exc_val: Any, exc_tb: Any) -> None: 

74 """Exit context, cleanup resource.""" 

75 # Note: exc_val and exc_tb intentionally unused but required for Python 3.11+ compatibility 

76 if self._cleanup_func is not None: 76 ↛ 80line 76 didn't jump to line 80 because the condition on line 76 was always true

77 self._cleanup_func(self._resource) 

78 

79 # Delete reference 

80 self._resource = None 

81 

82 # Force garbage collection 

83 gc.collect() 

84 

85 

86class ArrayManager(ResourceManager): 

87 """Context manager specifically for numpy arrays. 

88 

89 Example: 

90 >>> import numpy as np 

91 >>> with ArrayManager(np.zeros((10000, 10000))) as arr: 

92 ... result = np.sum(arr) 

93 >>> # Array memory is released 

94 """ 

95 

96 def __init__(self, array: NDArray[Any]) -> None: 

97 super().__init__(array, cleanup_func=lambda x: None) # noqa: ARG005 

98 

99 

100# ============================================================================= 

101# LRU Cache for Intermediate Results (MEM-021, MEM-029) 

102# ============================================================================= 

103 

104 

105class LRUCache[T]: 

106 """Least-Recently-Used cache with memory-based eviction. 

107 

108 Caches intermediate results with automatic eviction when 

109 memory limit is exceeded. 

110 

111 Args: 

112 max_memory_bytes: Maximum cache size in bytes. 

113 max_entries: Maximum number of cache entries (default unlimited). 

114 

115 Example: 

116 >>> cache = LRUCache(max_memory_bytes=1_000_000_000) # 1 GB 

117 >>> cache.put("key1", large_array, size_bytes=800_000_000) 

118 >>> result = cache.get("key1") 

119 >>> cache.clear() 

120 

121 References: 

122 MEM-021: Intermediate Result Eviction 

123 MEM-029: LRU Cache for Intermediate Results 

124 """ 

125 

126 def __init__( 

127 self, 

128 max_memory_bytes: int, 

129 max_entries: int | None = None, 

130 ) -> None: 

131 self._max_memory = max_memory_bytes 

132 self._max_entries = max_entries 

133 self._cache: OrderedDict[str, tuple[T, int, float]] = OrderedDict() 

134 self._current_memory: int = 0 

135 self._hits: int = 0 

136 self._misses: int = 0 

137 

138 def get(self, key: str) -> T | None: 

139 """Get cached value by key. 

140 

141 Args: 

142 key: Cache key. 

143 

144 Returns: 

145 Cached value or None if not found. 

146 """ 

147 if key in self._cache: 

148 # Move to end (most recently used) 

149 value, size, _ = self._cache.pop(key) 

150 self._cache[key] = (value, size, time.time()) 

151 self._hits += 1 

152 return value 

153 else: 

154 self._misses += 1 

155 return None 

156 

157 def put(self, key: str, value: T, size_bytes: int | None = None) -> None: 

158 """Put value in cache. 

159 

160 Args: 

161 key: Cache key. 

162 value: Value to cache. 

163 size_bytes: Size in bytes. If None, estimated from value. 

164 """ 

165 # Estimate size if not provided 

166 if size_bytes is None: 

167 size_bytes = self._estimate_size(value) 

168 

169 # Check if single item exceeds max memory 

170 if size_bytes > self._max_memory: 170 ↛ 172line 170 didn't jump to line 172 because the condition on line 170 was never true

171 # Don't cache items larger than max memory 

172 return 

173 

174 # Evict until we have space 

175 while ( 

176 self._current_memory + size_bytes > self._max_memory 

177 or (self._max_entries and len(self._cache) >= self._max_entries) 

178 ) and len(self._cache) > 0: 

179 self._evict_oldest() 

180 

181 # Remove if key already exists 

182 if key in self._cache: 182 ↛ 183line 182 didn't jump to line 183 because the condition on line 182 was never true

183 _, old_size, _ = self._cache.pop(key) 

184 self._current_memory -= old_size 

185 

186 # Add new entry 

187 self._cache[key] = (value, size_bytes, time.time()) 

188 self._current_memory += size_bytes 

189 

190 def _evict_oldest(self) -> None: 

191 """Evict least recently used entry.""" 

192 if len(self._cache) > 0: 192 ↛ exitline 192 didn't return from function '_evict_oldest' because the condition on line 192 was always true

193 _, (_, size, _) = self._cache.popitem(last=False) 

194 self._current_memory -= size 

195 

196 def clear(self) -> None: 

197 """Clear entire cache.""" 

198 self._cache.clear() 

199 self._current_memory = 0 

200 

201 def _estimate_size(self, value: T) -> int: 

202 """Estimate size of cached value.""" 

203 if isinstance(value, np.ndarray): 203 ↛ 205line 203 didn't jump to line 205 because the condition on line 203 was always true

204 return int(value.nbytes) 

205 elif isinstance(value, list | tuple): 

206 # Rough estimate for sequences 

207 return sum(self._estimate_size(item) for item in value) 

208 elif isinstance(value, dict): 

209 return sum(self._estimate_size(k) + self._estimate_size(v) for k, v in value.items()) 

210 else: 

211 # Fallback: assume 1KB for unknown types 

212 return 1024 

213 

214 def stats(self) -> dict[str, int | float]: 

215 """Get cache statistics. 

216 

217 Returns: 

218 Dictionary with cache stats. 

219 """ 

220 total_requests = self._hits + self._misses 

221 hit_rate = self._hits / total_requests if total_requests > 0 else 0.0 

222 

223 return { 

224 "entries": len(self._cache), 

225 "memory_bytes": self._current_memory, 

226 "memory_mb": self._current_memory / (1024 * 1024), 

227 "hits": self._hits, 

228 "misses": self._misses, 

229 "hit_rate": hit_rate, 

230 } 

231 

232 def __len__(self) -> int: 

233 """Number of cached entries.""" 

234 return len(self._cache) 

235 

236 

237# Global result cache 

238_result_cache: LRUCache[Any] | None = None 

239 

240 

241def get_result_cache() -> LRUCache[Any]: 

242 """Get global result cache. 

243 

244 Returns: 

245 Global LRU cache instance. 

246 

247 Example: 

248 >>> cache = get_result_cache() 

249 >>> cache.put("fft_result", fft_data, size_bytes=8000000) 

250 >>> result = cache.get("fft_result") 

251 """ 

252 global _result_cache 

253 if _result_cache is None: 

254 # Default: 2 GB cache 

255 max_cache_size = int(os.environ.get("TK_CACHE_SIZE", 2 * 1024 * 1024 * 1024)) # noqa: PLW1508 

256 _result_cache = LRUCache(max_memory_bytes=max_cache_size) 

257 return _result_cache 

258 

259 

260def clear_cache() -> None: 

261 """Clear the global result cache. 

262 

263 Example: 

264 >>> clear_cache() 

265 >>> # All cached results released 

266 """ 

267 cache = get_result_cache() 

268 cache.clear() 

269 

270 

271def show_cache_stats() -> dict[str, int | float]: 

272 """Show statistics for the global cache. 

273 

274 Returns: 

275 Dictionary with cache statistics. 

276 

277 Example: 

278 >>> stats = show_cache_stats() 

279 >>> print(f"Hit rate: {stats['hit_rate']*100:.1f}%") 

280 >>> print(f"Memory used: {stats['memory_mb']:.1f} MB") 

281 """ 

282 cache = get_result_cache() 

283 return cache.stats() 

284 

285 

286def cache_key(*args: Any, **kwargs: Any) -> str: 

287 """Generate cache key from arguments. 

288 

289 Args: 

290 *args: Positional arguments. 

291 **kwargs: Keyword arguments. 

292 

293 Returns: 

294 Hash-based cache key. 

295 

296 Example: 

297 >>> key = cache_key("fft", samples=1000, nfft=2048) 

298 >>> # Use key for caching 

299 """ 

300 # Create stable string representation 

301 parts = [str(arg) for arg in args] 

302 parts.extend(f"{k}={v}" for k, v in sorted(kwargs.items())) 

303 key_str = "|".join(parts) 

304 

305 # Hash for consistent key 

306 return hashlib.md5(key_str.encode()).hexdigest() 

307 

308 

309# ============================================================================= 

310# HDF5 Lazy Loading (MEM-017) 

311# ============================================================================= 

312 

313 

314def load_hdf5_lazy( 

315 file_path: str, 

316 dataset_path: str = "/data", 

317) -> Any: 

318 """Load HDF5 dataset as lazy h5py.Dataset (not fully in memory). 

319 

320 Enables partial loading via slicing without loading entire dataset. 

321 

322 Args: 

323 file_path: Path to HDF5 file. 

324 dataset_path: Path to dataset within file (default "/data"). 

325 

326 Returns: 

327 h5py.Dataset object (lazy, not loaded until accessed). 

328 

329 Raises: 

330 ImportError: If h5py is not available. 

331 FileNotFoundError: If file does not exist. 

332 KeyError: If dataset not found in file. 

333 

334 Example: 

335 >>> # Load dataset lazily 

336 >>> dataset = load_hdf5_lazy("large_file.h5", "/signals/ch1") 

337 >>> # Only load specific range (not entire file) 

338 >>> chunk = dataset[1000:2000] 

339 >>> print(f"Chunk shape: {chunk.shape}") 

340 

341 References: 

342 MEM-017: HDF5 Chunked Dataset Access 

343 """ 

344 try: 

345 import h5py 

346 except ImportError: 

347 raise ImportError( # noqa: B904 

348 "h5py required for lazy HDF5 loading. Install with: pip install h5py" 

349 ) 

350 

351 from pathlib import Path 

352 

353 path = Path(file_path) 

354 if not path.exists(): 

355 raise FileNotFoundError(f"File not found: {file_path}") 

356 

357 # Open file in read mode 

358 # Note: File handle should be kept open for lazy access 

359 # User is responsible for closing the file handle 

360 f = h5py.File(file_path, "r") 

361 

362 if dataset_path not in f: 

363 available = list(f.keys()) 

364 f.close() 

365 raise KeyError( 

366 f"Dataset '{dataset_path}' not found in HDF5 file. " 

367 f"Available datasets: {', '.join(available)}" 

368 ) 

369 

370 dataset = f[dataset_path] 

371 

372 return dataset 

373 

374 

375class LazyHDF5Array: 

376 """Wrapper for lazy HDF5 dataset access with context management. 

377 

378 Provides automatic file handle cleanup and numpy-like slicing. 

379 

380 Args: 

381 file_path: Path to HDF5 file. 

382 dataset_path: Path to dataset within file. 

383 

384 Example: 

385 >>> with LazyHDF5Array("data.h5", "/signals/ch1") as arr: 

386 ... # Only loads specific slice 

387 ... chunk = arr[1000:2000] 

388 ... print(f"Shape: {arr.shape}, dtype: {arr.dtype}") 

389 >>> # File automatically closed 

390 

391 References: 

392 MEM-017: HDF5 Chunked Dataset Access 

393 MEM-019: Explicit Resource Cleanup 

394 """ 

395 

396 def __init__(self, file_path: str, dataset_path: str = "/data"): 

397 self._file_path = file_path 

398 self._dataset_path = dataset_path 

399 self._file = None 

400 self._dataset = None 

401 

402 def __enter__(self) -> LazyHDF5Array: 

403 """Open HDF5 file and dataset.""" 

404 try: 

405 import h5py 

406 except ImportError: 

407 raise ImportError("h5py required. Install with: pip install h5py") # noqa: B904 

408 

409 self._file = h5py.File(self._file_path, "r") 

410 self._dataset = self._file[self._dataset_path] # type: ignore[index] 

411 return self 

412 

413 def __exit__(self, exc_type: Any, exc_val: Any, exc_tb: Any) -> None: 

414 """Close HDF5 file.""" 

415 # Note: exc_val and exc_tb intentionally unused but required for Python 3.11+ compatibility 

416 if self._file is not None: 416 ↛ exitline 416 didn't return from function '__exit__' because the condition on line 416 was always true

417 self._file.close() # type: ignore[unreachable] 

418 self._file = None 

419 self._dataset = None 

420 

421 def __getitem__(self, key: Any) -> NDArray[Any]: 

422 """Get item/slice from dataset (triggers partial load).""" 

423 if self._dataset is None: 423 ↛ 424line 423 didn't jump to line 424 because the condition on line 423 was never true

424 raise RuntimeError("LazyHDF5Array must be used as context manager") 

425 return np.asarray(self._dataset[key]) # type: ignore[unreachable] 

426 

427 @property 

428 def shape(self) -> tuple[int, ...]: 

429 """Dataset shape.""" 

430 if self._dataset is None: 430 ↛ 431line 430 didn't jump to line 431 because the condition on line 430 was never true

431 raise RuntimeError("LazyHDF5Array must be used as context manager") 

432 return self._dataset.shape # type: ignore[unreachable] 

433 

434 @property 

435 def dtype(self) -> np.dtype[Any]: 

436 """Dataset dtype.""" 

437 if self._dataset is None: 437 ↛ 438line 437 didn't jump to line 438 because the condition on line 437 was never true

438 raise RuntimeError("LazyHDF5Array must be used as context manager") 

439 return self._dataset.dtype # type: ignore[unreachable] 

440 

441 @property 

442 def size(self) -> int: 

443 """Total number of elements.""" 

444 if self._dataset is None: 

445 raise RuntimeError("LazyHDF5Array must be used as context manager") 

446 return self._dataset.size # type: ignore[unreachable] 

447 

448 def __len__(self) -> int: 

449 """Length of first dimension.""" 

450 if self._dataset is None: 

451 raise RuntimeError("LazyHDF5Array must be used as context manager") 

452 return len(self._dataset) # type: ignore[unreachable] 

453 

454 

455__all__ = [ 

456 "ArrayManager", 

457 "LRUCache", 

458 "LazyHDF5Array", 

459 "ResourceManager", 

460 "cache_key", 

461 "clear_cache", 

462 "get_result_cache", 

463 "load_hdf5_lazy", 

464 "show_cache_stats", 

465]