Coverage for src / tracekit / core / memory_monitor.py: 91%

123 statements  

« prev     ^ index     » next       coverage.py v7.13.1, created at 2026-01-11 23:04 +0000

1"""Memory monitoring and OOM prevention for TraceKit. 

2 

3This module provides runtime memory monitoring to prevent out-of-memory crashes 

4and gracefully handle memory exhaustion scenarios. 

5 

6 

7Example: 

8 >>> from tracekit.core.memory_monitor import monitor_memory, MemoryMonitor 

9 >>> with MemoryMonitor('spectrogram', max_memory="4GB") as monitor: 

10 ... for i in range(1000): 

11 ... # Perform work 

12 ... monitor.check(i) # Check memory periodically 

13 ... stats = monitor.get_stats() 

14 >>> print(f"Peak memory: {stats['peak'] / 1e9:.2f} GB") 

15 

16References: 

17 psutil documentation for memory monitoring 

18""" 

19 

20from __future__ import annotations 

21 

22import time 

23from contextlib import contextmanager 

24from dataclasses import dataclass 

25from typing import TYPE_CHECKING, Any 

26 

27from tracekit.config.memory import get_memory_config 

28from tracekit.utils.memory import get_available_memory, get_max_memory 

29 

30if TYPE_CHECKING: 

31 from collections.abc import Callable, Iterator 

32 

33 

34@dataclass 

35class MemorySnapshot: 

36 """Snapshot of memory state at a point in time. 

37 

38 Attributes: 

39 timestamp: Time of snapshot (seconds since epoch). 

40 available: Available system memory (bytes). 

41 process_rss: Process resident set size (bytes). 

42 process_vms: Process virtual memory size (bytes). 

43 pressure: Memory pressure (0.0-1.0). 

44 """ 

45 

46 timestamp: float 

47 available: int 

48 process_rss: int 

49 process_vms: int 

50 pressure: float 

51 

52 

53class MemoryMonitor: 

54 """Context manager for monitoring memory usage during operations. 

55 

56 

57 Monitors memory usage during long-running operations and aborts 

58 before system crashes if memory pressure becomes critical. 

59 

60 Attributes: 

61 operation: Name of the operation being monitored. 

62 max_memory: Maximum allowed memory (None = use global config). 

63 check_interval: How often to check memory (number of iterations). 

64 abort_on_critical: Whether to abort when critical threshold reached. 

65 

66 Example: 

67 >>> with MemoryMonitor('fft', max_memory="2GB") as monitor: 

68 ... result = compute_fft(data) 

69 ... stats = monitor.get_stats() 

70 >>> print(f"Peak: {stats['peak'] / 1e6:.1f} MB") 

71 

72 Raises: 

73 MemoryError: If memory usage approaches critical limit. 

74 """ 

75 

76 def __init__( 

77 self, 

78 operation: str, 

79 *, 

80 max_memory: int | str | None = None, 

81 check_interval: int = 100, 

82 abort_on_critical: bool = True, 

83 ): 

84 """Initialize memory monitor. 

85 

86 Args: 

87 operation: Name of operation being monitored. 

88 max_memory: Maximum memory limit (bytes, string like "4GB", or None for auto). 

89 check_interval: Check memory every N iterations. 

90 abort_on_critical: Abort operation if critical threshold reached. 

91 """ 

92 self.operation = operation 

93 self.check_interval = check_interval 

94 self.abort_on_critical = abort_on_critical 

95 

96 # Parse max_memory 

97 if max_memory is None: 

98 self.max_memory = get_max_memory() 

99 elif isinstance(max_memory, str): 

100 from tracekit.config.memory import _parse_memory_string 

101 

102 self.max_memory = _parse_memory_string(max_memory) 

103 else: 

104 self.max_memory = int(max_memory) 

105 

106 # State 

107 self.start_memory = 0 

108 self.peak_memory = 0 

109 self.current_memory = 0 

110 self._iteration = 0 

111 self._snapshots: list[MemorySnapshot] = [] 

112 self._start_time = 0.0 

113 

114 def __enter__(self) -> MemoryMonitor: 

115 """Enter context and record starting memory.""" 

116 self.start_memory = self._get_process_memory() 

117 self.peak_memory = self.start_memory 

118 self.current_memory = self.start_memory 

119 self._start_time = time.time() 

120 

121 # Take initial snapshot 

122 self._take_snapshot() 

123 

124 return self 

125 

126 def __exit__( 

127 self, 

128 exc_type: type[BaseException] | None, 

129 exc_val: BaseException | None, 

130 exc_tb: Any, 

131 ) -> None: 

132 """Exit context and finalize monitoring.""" 

133 # Note: exc_val and exc_tb intentionally unused but required for Python 3.11+ compatibility 

134 # Take final snapshot 

135 self._take_snapshot() 

136 

137 def check(self, iteration: int | None = None) -> None: 

138 """Check memory usage and raise error if limit approached. 

139 

140 

141 Args: 

142 iteration: Current iteration number (for periodic checking). 

143 

144 Raises: 

145 MemoryError: If memory usage exceeds critical threshold. 

146 

147 Example: 

148 >>> with MemoryMonitor('operation') as monitor: 

149 ... for i in range(10000): 

150 ... # Do work 

151 ... monitor.check(i) # Check every 100 iterations 

152 """ 

153 self._iteration += 1 

154 

155 # Only check periodically to reduce overhead 

156 if iteration is not None and iteration % self.check_interval != 0: 

157 return 

158 

159 self.current_memory = self._get_process_memory() 

160 self.peak_memory = max(self.peak_memory, self.current_memory) 

161 

162 # Check against available memory and thresholds 

163 available = get_available_memory() 

164 config = get_memory_config() 

165 

166 # Calculate pressure 

167 pressure = 1.0 - (available / self.max_memory) if self.max_memory > 0 else 0.0 

168 

169 # Take snapshot if significant time passed 

170 if self._snapshots and (time.time() - self._snapshots[-1].timestamp) > 1.0: 170 ↛ 171line 170 didn't jump to line 171 because the condition on line 170 was never true

171 self._take_snapshot() 

172 

173 # Check critical threshold 

174 if self.abort_on_critical and pressure >= config.critical_threshold: 

175 raise MemoryError( 

176 f"Critical memory pressure during {self.operation}. " 

177 f"Available: {available / 1e9:.2f} GB, " 

178 f"Pressure: {pressure * 100:.1f}%, " 

179 f"Limit: {self.max_memory / 1e9:.2f} GB. " 

180 f"Operation aborted to prevent system crash. " 

181 f"Suggestion: Reduce dataset size, increase memory limit, " 

182 f"or use chunked processing." 

183 ) 

184 

185 def get_stats(self) -> dict[str, int | float]: 

186 """Get memory statistics for this monitoring session. 

187 

188 Returns: 

189 Dictionary with memory statistics including: 

190 - start: Starting memory (bytes) 

191 - current: Current memory (bytes) 

192 - peak: Peak memory usage (bytes) 

193 - delta: Memory increase since start (bytes) 

194 - duration: Monitoring duration (seconds) 

195 

196 Example: 

197 >>> with MemoryMonitor('operation') as monitor: 

198 ... # ... do work ... 

199 ... stats = monitor.get_stats() 

200 >>> print(f"Peak: {stats['peak'] / 1e6:.1f} MB") 

201 """ 

202 duration = time.time() - self._start_time if self._start_time > 0 else 0.0 

203 

204 return { 

205 "start": self.start_memory, 

206 "current": self.current_memory, 

207 "peak": self.peak_memory, 

208 "delta": self.peak_memory - self.start_memory, 

209 "duration": duration, 

210 } 

211 

212 def get_snapshots(self) -> list[MemorySnapshot]: 

213 """Get all memory snapshots taken during monitoring. 

214 

215 Returns: 

216 List of MemorySnapshot objects. 

217 

218 Example: 

219 >>> with MemoryMonitor('operation') as monitor: 

220 ... # ... work ... 

221 ... pass 

222 >>> for snap in monitor.get_snapshots(): 

223 ... print(f"t={snap.timestamp:.1f}s: {snap.available/1e9:.2f} GB available") 

224 """ 

225 return self._snapshots.copy() 

226 

227 def _get_process_memory(self) -> int: 

228 """Get current process memory usage in bytes. 

229 

230 Returns: 

231 Resident set size (RSS) in bytes. 

232 """ 

233 try: 

234 import psutil 

235 

236 process = psutil.Process() 

237 return process.memory_info().rss # type: ignore[no-any-return] 

238 except ImportError: 

239 # Fallback: estimate from system memory 

240 from tracekit.utils.memory import get_total_memory 

241 

242 return get_total_memory() - get_available_memory() 

243 

244 def _take_snapshot(self) -> None: 

245 """Take a snapshot of current memory state.""" 

246 try: 

247 import psutil 

248 

249 process = psutil.Process() 

250 mem_info = process.memory_info() 

251 available = get_available_memory() 

252 

253 from tracekit.utils.memory import get_memory_pressure 

254 

255 pressure = get_memory_pressure() 

256 

257 snapshot = MemorySnapshot( 

258 timestamp=time.time(), 

259 available=available, 

260 process_rss=mem_info.rss, 

261 process_vms=mem_info.vms, 

262 pressure=pressure, 

263 ) 

264 self._snapshots.append(snapshot) 

265 except ImportError: 

266 # Skip snapshots if psutil not available 

267 pass 

268 

269 

270@contextmanager 

271def monitor_memory( 

272 operation: str, 

273 *, 

274 max_memory: int | str | None = None, 

275 check_interval: int = 100, 

276) -> Iterator[MemoryMonitor]: 

277 """Context manager for monitoring memory usage. 

278 

279 

280 Convenience function that wraps MemoryMonitor. 

281 

282 Args: 

283 operation: Name of operation being monitored. 

284 max_memory: Maximum memory limit. 

285 check_interval: Check memory every N iterations. 

286 

287 Yields: 

288 MemoryMonitor instance. 

289 

290 Example: 

291 >>> with monitor_memory('spectrogram', max_memory="4GB") as mon: 

292 ... for i in range(1000): 

293 ... # Work 

294 ... mon.check(i) 

295 """ 

296 monitor = MemoryMonitor( 

297 operation, 

298 max_memory=max_memory, 

299 check_interval=check_interval, 

300 ) 

301 with monitor: 

302 yield monitor 

303 

304 

305@dataclass 

306class ProgressWithMemory: 

307 """Progress information with memory metrics. 

308 

309 

310 Attributes: 

311 current: Current progress value. 

312 total: Total progress value. 

313 eta_seconds: Estimated time to completion (seconds). 

314 memory_used: Current memory usage (bytes). 

315 memory_peak: Peak memory usage (bytes). 

316 memory_available: Available system memory (bytes). 

317 operation: Name of operation. 

318 """ 

319 

320 current: int 

321 total: int 

322 eta_seconds: float 

323 memory_used: int 

324 memory_peak: int 

325 memory_available: int 

326 operation: str 

327 

328 @property 

329 def percent(self) -> float: 

330 """Progress percentage (0.0-100.0).""" 

331 if self.total == 0: 

332 return 100.0 

333 return (self.current / self.total) * 100.0 

334 

335 @property 

336 def memory_pressure(self) -> float: 

337 """Memory pressure (0.0-1.0).""" 

338 from tracekit.utils.memory import get_memory_pressure 

339 

340 return get_memory_pressure() 

341 

342 def format_progress(self) -> str: 

343 """Format progress as human-readable string. 

344 

345 Returns: 

346 Formatted progress string with memory info. 

347 

348 Example: 

349 >>> progress = ProgressWithMemory(42, 100, 5.0, 1.2e9, 2.1e9, 6e9, "fft") 

350 >>> print(progress.format_progress()) 

351 42.0% | 1.20 GB used | 2.10 GB peak | 6.00 GB avail | ETA 5s 

352 """ 

353 return ( 

354 f"{self.percent:.1f}% | " 

355 f"{self.memory_used / 1e9:.2f} GB used | " 

356 f"{self.memory_peak / 1e9:.2f} GB peak | " 

357 f"{self.memory_available / 1e9:.2f} GB avail | " 

358 f"ETA {self.eta_seconds:.0f}s" 

359 ) 

360 

361 

362class ProgressMonitor: 

363 """Combined progress and memory monitoring. 

364 

365 

366 Tracks both operation progress and memory usage, providing 

367 unified progress updates with memory metrics. 

368 

369 Example: 

370 >>> monitor = ProgressMonitor('spectrogram', total=1000) 

371 >>> for i in range(1000): 

372 ... # Work 

373 ... monitor.update(i) 

374 ... if i % 100 == 0: 

375 ... progress = monitor.get_progress() 

376 ... print(progress.format_progress()) 

377 """ 

378 

379 def __init__( 

380 self, 

381 operation: str, 

382 total: int, 

383 *, 

384 callback: Callable[[ProgressWithMemory], None] | None = None, 

385 update_interval: int = 1, 

386 ): 

387 """Initialize progress monitor. 

388 

389 Args: 

390 operation: Name of operation. 

391 total: Total number of items to process. 

392 callback: Optional callback function called on each update. 

393 update_interval: Call callback every N updates. 

394 """ 

395 self.operation = operation 

396 self.total = total 

397 self.callback = callback 

398 self.update_interval = update_interval 

399 self.current = 0 

400 self._start_time = time.time() 

401 self._memory_monitor = MemoryMonitor(operation, check_interval=1) 

402 self._update_count = 0 

403 

404 def update(self, current: int | None = None) -> None: 

405 """Update progress. 

406 

407 Args: 

408 current: Current progress value (if None, increments by 1). 

409 """ 

410 if current is not None: 

411 self.current = current 

412 else: 

413 self.current += 1 

414 

415 self._update_count += 1 

416 

417 # Check memory 

418 self._memory_monitor.check(self._update_count) 

419 

420 # Call callback if interval reached 

421 if self.callback and self._update_count % self.update_interval == 0: 

422 progress = self.get_progress() 

423 self.callback(progress) 

424 

425 def get_progress(self) -> ProgressWithMemory: 

426 """Get current progress with memory metrics. 

427 

428 Returns: 

429 ProgressWithMemory instance. 

430 """ 

431 elapsed = time.time() - self._start_time 

432 eta = elapsed / self.current * (self.total - self.current) if self.current > 0 else 0.0 

433 

434 stats = self._memory_monitor.get_stats() 

435 

436 return ProgressWithMemory( 

437 current=self.current, 

438 total=self.total, 

439 eta_seconds=eta, 

440 memory_used=stats["current"], # type: ignore[arg-type] 

441 memory_peak=stats["peak"], # type: ignore[arg-type] 

442 memory_available=get_available_memory(), 

443 operation=self.operation, 

444 ) 

445 

446 

447__all__ = [ 

448 "MemoryMonitor", 

449 "MemorySnapshot", 

450 "ProgressMonitor", 

451 "ProgressWithMemory", 

452 "monitor_memory", 

453]