Coverage for src / tracekit / core / memory_monitor.py: 91%
123 statements
« prev ^ index » next coverage.py v7.13.1, created at 2026-01-11 23:04 +0000
« prev ^ index » next coverage.py v7.13.1, created at 2026-01-11 23:04 +0000
1"""Memory monitoring and OOM prevention for TraceKit.
3This module provides runtime memory monitoring to prevent out-of-memory crashes
4and gracefully handle memory exhaustion scenarios.
7Example:
8 >>> from tracekit.core.memory_monitor import monitor_memory, MemoryMonitor
9 >>> with MemoryMonitor('spectrogram', max_memory="4GB") as monitor:
10 ... for i in range(1000):
11 ... # Perform work
12 ... monitor.check(i) # Check memory periodically
13 ... stats = monitor.get_stats()
14 >>> print(f"Peak memory: {stats['peak'] / 1e9:.2f} GB")
16References:
17 psutil documentation for memory monitoring
18"""
20from __future__ import annotations
22import time
23from contextlib import contextmanager
24from dataclasses import dataclass
25from typing import TYPE_CHECKING, Any
27from tracekit.config.memory import get_memory_config
28from tracekit.utils.memory import get_available_memory, get_max_memory
30if TYPE_CHECKING:
31 from collections.abc import Callable, Iterator
34@dataclass
35class MemorySnapshot:
36 """Snapshot of memory state at a point in time.
38 Attributes:
39 timestamp: Time of snapshot (seconds since epoch).
40 available: Available system memory (bytes).
41 process_rss: Process resident set size (bytes).
42 process_vms: Process virtual memory size (bytes).
43 pressure: Memory pressure (0.0-1.0).
44 """
46 timestamp: float
47 available: int
48 process_rss: int
49 process_vms: int
50 pressure: float
53class MemoryMonitor:
54 """Context manager for monitoring memory usage during operations.
57 Monitors memory usage during long-running operations and aborts
58 before system crashes if memory pressure becomes critical.
60 Attributes:
61 operation: Name of the operation being monitored.
62 max_memory: Maximum allowed memory (None = use global config).
63 check_interval: How often to check memory (number of iterations).
64 abort_on_critical: Whether to abort when critical threshold reached.
66 Example:
67 >>> with MemoryMonitor('fft', max_memory="2GB") as monitor:
68 ... result = compute_fft(data)
69 ... stats = monitor.get_stats()
70 >>> print(f"Peak: {stats['peak'] / 1e6:.1f} MB")
72 Raises:
73 MemoryError: If memory usage approaches critical limit.
74 """
76 def __init__(
77 self,
78 operation: str,
79 *,
80 max_memory: int | str | None = None,
81 check_interval: int = 100,
82 abort_on_critical: bool = True,
83 ):
84 """Initialize memory monitor.
86 Args:
87 operation: Name of operation being monitored.
88 max_memory: Maximum memory limit (bytes, string like "4GB", or None for auto).
89 check_interval: Check memory every N iterations.
90 abort_on_critical: Abort operation if critical threshold reached.
91 """
92 self.operation = operation
93 self.check_interval = check_interval
94 self.abort_on_critical = abort_on_critical
96 # Parse max_memory
97 if max_memory is None:
98 self.max_memory = get_max_memory()
99 elif isinstance(max_memory, str):
100 from tracekit.config.memory import _parse_memory_string
102 self.max_memory = _parse_memory_string(max_memory)
103 else:
104 self.max_memory = int(max_memory)
106 # State
107 self.start_memory = 0
108 self.peak_memory = 0
109 self.current_memory = 0
110 self._iteration = 0
111 self._snapshots: list[MemorySnapshot] = []
112 self._start_time = 0.0
114 def __enter__(self) -> MemoryMonitor:
115 """Enter context and record starting memory."""
116 self.start_memory = self._get_process_memory()
117 self.peak_memory = self.start_memory
118 self.current_memory = self.start_memory
119 self._start_time = time.time()
121 # Take initial snapshot
122 self._take_snapshot()
124 return self
126 def __exit__(
127 self,
128 exc_type: type[BaseException] | None,
129 exc_val: BaseException | None,
130 exc_tb: Any,
131 ) -> None:
132 """Exit context and finalize monitoring."""
133 # Note: exc_val and exc_tb intentionally unused but required for Python 3.11+ compatibility
134 # Take final snapshot
135 self._take_snapshot()
137 def check(self, iteration: int | None = None) -> None:
138 """Check memory usage and raise error if limit approached.
141 Args:
142 iteration: Current iteration number (for periodic checking).
144 Raises:
145 MemoryError: If memory usage exceeds critical threshold.
147 Example:
148 >>> with MemoryMonitor('operation') as monitor:
149 ... for i in range(10000):
150 ... # Do work
151 ... monitor.check(i) # Check every 100 iterations
152 """
153 self._iteration += 1
155 # Only check periodically to reduce overhead
156 if iteration is not None and iteration % self.check_interval != 0:
157 return
159 self.current_memory = self._get_process_memory()
160 self.peak_memory = max(self.peak_memory, self.current_memory)
162 # Check against available memory and thresholds
163 available = get_available_memory()
164 config = get_memory_config()
166 # Calculate pressure
167 pressure = 1.0 - (available / self.max_memory) if self.max_memory > 0 else 0.0
169 # Take snapshot if significant time passed
170 if self._snapshots and (time.time() - self._snapshots[-1].timestamp) > 1.0: 170 ↛ 171line 170 didn't jump to line 171 because the condition on line 170 was never true
171 self._take_snapshot()
173 # Check critical threshold
174 if self.abort_on_critical and pressure >= config.critical_threshold:
175 raise MemoryError(
176 f"Critical memory pressure during {self.operation}. "
177 f"Available: {available / 1e9:.2f} GB, "
178 f"Pressure: {pressure * 100:.1f}%, "
179 f"Limit: {self.max_memory / 1e9:.2f} GB. "
180 f"Operation aborted to prevent system crash. "
181 f"Suggestion: Reduce dataset size, increase memory limit, "
182 f"or use chunked processing."
183 )
185 def get_stats(self) -> dict[str, int | float]:
186 """Get memory statistics for this monitoring session.
188 Returns:
189 Dictionary with memory statistics including:
190 - start: Starting memory (bytes)
191 - current: Current memory (bytes)
192 - peak: Peak memory usage (bytes)
193 - delta: Memory increase since start (bytes)
194 - duration: Monitoring duration (seconds)
196 Example:
197 >>> with MemoryMonitor('operation') as monitor:
198 ... # ... do work ...
199 ... stats = monitor.get_stats()
200 >>> print(f"Peak: {stats['peak'] / 1e6:.1f} MB")
201 """
202 duration = time.time() - self._start_time if self._start_time > 0 else 0.0
204 return {
205 "start": self.start_memory,
206 "current": self.current_memory,
207 "peak": self.peak_memory,
208 "delta": self.peak_memory - self.start_memory,
209 "duration": duration,
210 }
212 def get_snapshots(self) -> list[MemorySnapshot]:
213 """Get all memory snapshots taken during monitoring.
215 Returns:
216 List of MemorySnapshot objects.
218 Example:
219 >>> with MemoryMonitor('operation') as monitor:
220 ... # ... work ...
221 ... pass
222 >>> for snap in monitor.get_snapshots():
223 ... print(f"t={snap.timestamp:.1f}s: {snap.available/1e9:.2f} GB available")
224 """
225 return self._snapshots.copy()
227 def _get_process_memory(self) -> int:
228 """Get current process memory usage in bytes.
230 Returns:
231 Resident set size (RSS) in bytes.
232 """
233 try:
234 import psutil
236 process = psutil.Process()
237 return process.memory_info().rss # type: ignore[no-any-return]
238 except ImportError:
239 # Fallback: estimate from system memory
240 from tracekit.utils.memory import get_total_memory
242 return get_total_memory() - get_available_memory()
244 def _take_snapshot(self) -> None:
245 """Take a snapshot of current memory state."""
246 try:
247 import psutil
249 process = psutil.Process()
250 mem_info = process.memory_info()
251 available = get_available_memory()
253 from tracekit.utils.memory import get_memory_pressure
255 pressure = get_memory_pressure()
257 snapshot = MemorySnapshot(
258 timestamp=time.time(),
259 available=available,
260 process_rss=mem_info.rss,
261 process_vms=mem_info.vms,
262 pressure=pressure,
263 )
264 self._snapshots.append(snapshot)
265 except ImportError:
266 # Skip snapshots if psutil not available
267 pass
270@contextmanager
271def monitor_memory(
272 operation: str,
273 *,
274 max_memory: int | str | None = None,
275 check_interval: int = 100,
276) -> Iterator[MemoryMonitor]:
277 """Context manager for monitoring memory usage.
280 Convenience function that wraps MemoryMonitor.
282 Args:
283 operation: Name of operation being monitored.
284 max_memory: Maximum memory limit.
285 check_interval: Check memory every N iterations.
287 Yields:
288 MemoryMonitor instance.
290 Example:
291 >>> with monitor_memory('spectrogram', max_memory="4GB") as mon:
292 ... for i in range(1000):
293 ... # Work
294 ... mon.check(i)
295 """
296 monitor = MemoryMonitor(
297 operation,
298 max_memory=max_memory,
299 check_interval=check_interval,
300 )
301 with monitor:
302 yield monitor
305@dataclass
306class ProgressWithMemory:
307 """Progress information with memory metrics.
310 Attributes:
311 current: Current progress value.
312 total: Total progress value.
313 eta_seconds: Estimated time to completion (seconds).
314 memory_used: Current memory usage (bytes).
315 memory_peak: Peak memory usage (bytes).
316 memory_available: Available system memory (bytes).
317 operation: Name of operation.
318 """
320 current: int
321 total: int
322 eta_seconds: float
323 memory_used: int
324 memory_peak: int
325 memory_available: int
326 operation: str
328 @property
329 def percent(self) -> float:
330 """Progress percentage (0.0-100.0)."""
331 if self.total == 0:
332 return 100.0
333 return (self.current / self.total) * 100.0
335 @property
336 def memory_pressure(self) -> float:
337 """Memory pressure (0.0-1.0)."""
338 from tracekit.utils.memory import get_memory_pressure
340 return get_memory_pressure()
342 def format_progress(self) -> str:
343 """Format progress as human-readable string.
345 Returns:
346 Formatted progress string with memory info.
348 Example:
349 >>> progress = ProgressWithMemory(42, 100, 5.0, 1.2e9, 2.1e9, 6e9, "fft")
350 >>> print(progress.format_progress())
351 42.0% | 1.20 GB used | 2.10 GB peak | 6.00 GB avail | ETA 5s
352 """
353 return (
354 f"{self.percent:.1f}% | "
355 f"{self.memory_used / 1e9:.2f} GB used | "
356 f"{self.memory_peak / 1e9:.2f} GB peak | "
357 f"{self.memory_available / 1e9:.2f} GB avail | "
358 f"ETA {self.eta_seconds:.0f}s"
359 )
362class ProgressMonitor:
363 """Combined progress and memory monitoring.
366 Tracks both operation progress and memory usage, providing
367 unified progress updates with memory metrics.
369 Example:
370 >>> monitor = ProgressMonitor('spectrogram', total=1000)
371 >>> for i in range(1000):
372 ... # Work
373 ... monitor.update(i)
374 ... if i % 100 == 0:
375 ... progress = monitor.get_progress()
376 ... print(progress.format_progress())
377 """
379 def __init__(
380 self,
381 operation: str,
382 total: int,
383 *,
384 callback: Callable[[ProgressWithMemory], None] | None = None,
385 update_interval: int = 1,
386 ):
387 """Initialize progress monitor.
389 Args:
390 operation: Name of operation.
391 total: Total number of items to process.
392 callback: Optional callback function called on each update.
393 update_interval: Call callback every N updates.
394 """
395 self.operation = operation
396 self.total = total
397 self.callback = callback
398 self.update_interval = update_interval
399 self.current = 0
400 self._start_time = time.time()
401 self._memory_monitor = MemoryMonitor(operation, check_interval=1)
402 self._update_count = 0
404 def update(self, current: int | None = None) -> None:
405 """Update progress.
407 Args:
408 current: Current progress value (if None, increments by 1).
409 """
410 if current is not None:
411 self.current = current
412 else:
413 self.current += 1
415 self._update_count += 1
417 # Check memory
418 self._memory_monitor.check(self._update_count)
420 # Call callback if interval reached
421 if self.callback and self._update_count % self.update_interval == 0:
422 progress = self.get_progress()
423 self.callback(progress)
425 def get_progress(self) -> ProgressWithMemory:
426 """Get current progress with memory metrics.
428 Returns:
429 ProgressWithMemory instance.
430 """
431 elapsed = time.time() - self._start_time
432 eta = elapsed / self.current * (self.total - self.current) if self.current > 0 else 0.0
434 stats = self._memory_monitor.get_stats()
436 return ProgressWithMemory(
437 current=self.current,
438 total=self.total,
439 eta_seconds=eta,
440 memory_used=stats["current"], # type: ignore[arg-type]
441 memory_peak=stats["peak"], # type: ignore[arg-type]
442 memory_available=get_available_memory(),
443 operation=self.operation,
444 )
447__all__ = [
448 "MemoryMonitor",
449 "MemorySnapshot",
450 "ProgressMonitor",
451 "ProgressWithMemory",
452 "monitor_memory",
453]