Coverage for src / tracekit / core / memory_progress.py: 98%
106 statements
« prev ^ index » next coverage.py v7.13.1, created at 2026-01-11 23:04 +0000
« prev ^ index » next coverage.py v7.13.1, created at 2026-01-11 23:04 +0000
1"""Memory usage logging and progress tracking for TraceKit operations.
3This module provides detailed memory profiling logs for debugging and monitoring
4memory usage during long-running operations.
7Example:
8 >>> from tracekit.core.memory_progress import MemoryLogger
9 >>> logger = MemoryLogger("analysis.log", format="csv")
10 >>> with logger:
11 ... for i in range(1000):
12 ... # Perform work
13 ... logger.log_operation("fft", iteration=i)
14 >>> stats = logger.get_summary()
16References:
17 psutil documentation for memory monitoring
18"""
20from __future__ import annotations
22import csv
23import json
24import os
25import time
26from contextlib import contextmanager
27from dataclasses import asdict, dataclass
28from pathlib import Path
29from typing import TYPE_CHECKING, Any, Literal, TextIO
31from tracekit.utils.memory import get_available_memory, get_memory_pressure
33if TYPE_CHECKING:
34 from collections.abc import Iterator
37@dataclass
38class MemoryLogEntry:
39 """Single memory log entry.
42 Attributes:
43 timestamp: Time of log entry (seconds since epoch).
44 operation: Name of operation being performed.
45 iteration: Iteration number (if applicable).
46 memory_used: Process memory usage (bytes).
47 memory_peak: Peak memory since start (bytes).
48 memory_available: System available memory (bytes).
49 memory_pressure: Memory pressure (0.0-1.0).
50 eta_seconds: Estimated time to completion (seconds).
51 message: Optional descriptive message.
52 """
54 timestamp: float
55 operation: str
56 iteration: int | None
57 memory_used: int
58 memory_peak: int
59 memory_available: int
60 memory_pressure: float
61 eta_seconds: float
62 message: str
65class MemoryLogger:
66 """Logger for detailed memory profiling during operations.
69 Logs memory usage at each operation with timestamps and metadata.
70 Supports CSV and JSON output formats.
72 Args:
73 log_file: Path to output log file.
74 format: Output format ('csv' or 'json').
75 auto_flush: Flush after each write (default: True).
76 enable_console: Also print to console (default: False).
78 Example:
79 >>> logger = MemoryLogger("memory.csv", format="csv")
80 >>> with logger:
81 ... for i in range(100):
82 ... # Do work
83 ... logger.log_operation("processing", iteration=i)
84 >>> print(logger.get_summary())
86 References:
87 MEM-025: Memory Usage Logging
88 """
90 def __init__(
91 self,
92 log_file: str | Path,
93 *,
94 format: Literal["csv", "json"] = "csv",
95 auto_flush: bool = True,
96 enable_console: bool = False,
97 ):
98 """Initialize memory logger.
100 Args:
101 log_file: Path to log file.
102 format: Output format ('csv' or 'json').
103 auto_flush: Flush after each entry.
104 enable_console: Print to console as well.
105 """
106 self.log_file = Path(log_file)
107 self.format = format
108 self.auto_flush = auto_flush
109 self.enable_console = enable_console
111 # State
112 self._entries: list[MemoryLogEntry] = []
113 self._file_handle: TextIO | None = None
114 self._csv_writer: Any = None
115 self._start_time = 0.0
116 self._start_memory = 0
117 self._peak_memory = 0
119 # Create directory if needed
120 self.log_file.parent.mkdir(parents=True, exist_ok=True)
122 def __enter__(self) -> MemoryLogger:
123 """Enter context and initialize logging."""
124 self._start_time = time.time()
125 self._start_memory = self._get_process_memory()
126 self._peak_memory = self._start_memory
128 # Open log file
129 self._file_handle = open(self.log_file, "w", newline="")
131 # Initialize CSV writer if needed
132 if self.format == "csv":
133 self._csv_writer = csv.DictWriter(
134 self._file_handle, # type: ignore[arg-type]
135 fieldnames=[
136 "timestamp",
137 "operation",
138 "iteration",
139 "memory_used",
140 "memory_peak",
141 "memory_available",
142 "memory_pressure",
143 "eta_seconds",
144 "message",
145 ],
146 )
147 self._csv_writer.writeheader() # type: ignore[attr-defined]
148 if self.auto_flush: 148 ↛ 151line 148 didn't jump to line 151 because the condition on line 148 was always true
149 self._file_handle.flush() # type: ignore[attr-defined]
151 return self
153 def __exit__(self, exc_type, exc_val, exc_tb) -> None: # type: ignore[no-untyped-def]
154 """Exit context and finalize logging."""
155 # Note: exc_val and exc_tb intentionally unused but required for Python 3.11+ compatibility
156 # Write summary for JSON format
157 if self.format == "json" and self._file_handle:
158 summary = { # type: ignore[unreachable]
159 "entries": [asdict(entry) for entry in self._entries],
160 "summary": self._get_summary_dict(),
161 }
162 json.dump(summary, self._file_handle, indent=2)
164 # Close file
165 if self._file_handle: 165 ↛ exitline 165 didn't return from function '__exit__' because the condition on line 165 was always true
166 self._file_handle.close() # type: ignore[unreachable]
167 self._file_handle = None
169 def log_operation(
170 self,
171 operation: str,
172 *,
173 iteration: int | None = None,
174 eta_seconds: float = 0.0,
175 message: str = "",
176 ) -> None:
177 """Log memory usage for an operation.
180 Args:
181 operation: Name of operation.
182 iteration: Iteration number (optional).
183 eta_seconds: Estimated time to completion.
184 message: Optional descriptive message.
186 Example:
187 >>> logger.log_operation("fft", iteration=100, eta_seconds=5.2)
189 References:
190 MEM-025: Memory Usage Logging
191 """
192 # Get memory metrics
193 memory_used = self._get_process_memory()
194 memory_available = get_available_memory()
195 memory_pressure = get_memory_pressure()
197 # Update peak
198 self._peak_memory = max(self._peak_memory, memory_used)
200 # Create entry
201 entry = MemoryLogEntry(
202 timestamp=time.time(),
203 operation=operation,
204 iteration=iteration,
205 memory_used=memory_used,
206 memory_peak=self._peak_memory,
207 memory_available=memory_available,
208 memory_pressure=memory_pressure,
209 eta_seconds=eta_seconds,
210 message=message,
211 )
213 self._entries.append(entry)
215 # Write to file
216 if self._file_handle and self.format == "csv": # type: ignore[unreachable]
217 self._csv_writer.writerow(asdict(entry)) # type: ignore[unreachable]
218 if self.auto_flush: 218 ↛ 222line 218 didn't jump to line 222 because the condition on line 218 was always true
219 self._file_handle.flush()
221 # Console output
222 if self.enable_console:
223 print(self._format_entry(entry))
225 def log_progress(
226 self,
227 operation: str,
228 current: int,
229 total: int,
230 *,
231 message: str = "",
232 ) -> None:
233 """Log memory usage with progress information.
236 Convenience method that calculates ETA from progress.
238 Args:
239 operation: Name of operation.
240 current: Current progress value.
241 total: Total progress value.
242 message: Optional message.
244 Example:
245 >>> for i in range(1000):
246 ... logger.log_progress("analysis", i + 1, 1000)
248 References:
249 MEM-024: Memory-Aware Progress Callback
250 MEM-025: Memory Usage Logging
251 """
252 # Calculate ETA
253 elapsed = time.time() - self._start_time
254 eta = elapsed / current * (total - current) if current > 0 else 0.0
256 self.log_operation(
257 operation,
258 iteration=current,
259 eta_seconds=eta,
260 message=message,
261 )
263 def get_summary(self) -> str:
264 """Get human-readable summary of memory usage.
266 Returns:
267 Formatted summary string.
269 Example:
270 >>> logger = MemoryLogger("test.log")
271 >>> with logger:
272 ... logger.log_operation("test")
273 >>> print(logger.get_summary())
275 References:
276 MEM-025: Memory Usage Logging
277 """
278 summary = self._get_summary_dict()
280 return (
281 f"Memory Usage Summary:\n"
282 f" Entries: {summary['entry_count']}\n"
283 f" Duration: {summary['duration']:.2f}s\n"
284 f" Start Memory: {summary['start_memory'] / 1e9:.2f} GB\n"
285 f" Peak Memory: {summary['peak_memory'] / 1e9:.2f} GB\n"
286 f" Delta: {summary['memory_delta'] / 1e9:.2f} GB\n"
287 f" Min Available: {summary['min_available'] / 1e9:.2f} GB\n"
288 f" Max Pressure: {summary['max_pressure'] * 100:.1f}%\n"
289 )
291 def get_entries(self) -> list[MemoryLogEntry]:
292 """Get all logged entries.
294 Returns:
295 List of memory log entries.
297 References:
298 MEM-025: Memory Usage Logging
299 """
300 return self._entries.copy()
302 def _get_summary_dict(self) -> dict: # type: ignore[type-arg]
303 """Get summary statistics as dictionary."""
304 if not self._entries:
305 return {
306 "entry_count": 0,
307 "duration": 0.0,
308 "start_memory": self._start_memory,
309 "peak_memory": self._peak_memory,
310 "memory_delta": 0,
311 "min_available": 0,
312 "max_pressure": 0.0,
313 }
315 duration = self._entries[-1].timestamp - self._entries[0].timestamp
316 min_available = min(e.memory_available for e in self._entries)
317 max_pressure = max(e.memory_pressure for e in self._entries)
319 return {
320 "entry_count": len(self._entries),
321 "duration": duration,
322 "start_memory": self._start_memory,
323 "peak_memory": self._peak_memory,
324 "memory_delta": self._peak_memory - self._start_memory,
325 "min_available": min_available,
326 "max_pressure": max_pressure,
327 }
329 def _format_entry(self, entry: MemoryLogEntry) -> str:
330 """Format entry for console output."""
331 elapsed = entry.timestamp - self._start_time
332 return (
333 f"[{elapsed:7.2f}s] {entry.operation:20s} | "
334 f"Used: {entry.memory_used / 1e9:6.2f} GB | "
335 f"Peak: {entry.memory_peak / 1e9:6.2f} GB | "
336 f"Avail: {entry.memory_available / 1e9:6.2f} GB | "
337 f"Pressure: {entry.memory_pressure * 100:5.1f}%"
338 + (f" | {entry.message}" if entry.message else "")
339 )
341 def _get_process_memory(self) -> int:
342 """Get current process memory usage in bytes."""
343 try:
344 import psutil
346 process = psutil.Process()
347 return process.memory_info().rss # type: ignore[no-any-return]
348 except ImportError:
349 # Fallback: estimate from system memory
350 from tracekit.utils.memory import get_total_memory
352 return get_total_memory() - get_available_memory()
355@contextmanager
356def log_memory(
357 log_file: str | Path,
358 *,
359 format: Literal["csv", "json"] = "csv",
360 enable_console: bool = False,
361) -> Iterator[MemoryLogger]:
362 """Context manager for memory logging.
365 Convenience function that wraps MemoryLogger.
367 Args:
368 log_file: Path to log file.
369 format: Output format ('csv' or 'json').
370 enable_console: Print to console.
372 Yields:
373 MemoryLogger instance.
375 Example:
376 >>> with log_memory("analysis.csv") as logger:
377 ... for i in range(1000):
378 ... # Do work
379 ... logger.log_progress("processing", i + 1, 1000)
381 References:
382 MEM-025: Memory Usage Logging
383 """
384 logger = MemoryLogger(log_file, format=format, enable_console=enable_console)
385 with logger:
386 yield logger
389def create_progress_callback_with_logging(
390 logger: MemoryLogger,
391 operation: str,
392) -> callable: # type: ignore[valid-type]
393 """Create progress callback that logs to MemoryLogger.
396 Returns a callback function compatible with progress tracking APIs
397 that automatically logs memory usage.
399 Args:
400 logger: MemoryLogger instance to log to.
401 operation: Name of operation.
403 Returns:
404 Progress callback function.
406 Example:
407 >>> logger = MemoryLogger("test.log")
408 >>> callback = create_progress_callback_with_logging(logger, "fft")
409 >>> callback(50, 100, "Processing")
411 References:
412 MEM-024: Memory-Aware Progress Callback
413 MEM-025: Memory Usage Logging
414 """
416 def callback(current: int, total: int, message: str) -> None:
417 """Progress callback with memory logging."""
418 logger.log_progress(operation, current, total, message=message)
420 return callback
423def enable_memory_logging_from_cli(
424 log_file: str | Path | None = None,
425) -> MemoryLogger | None:
426 """Enable memory logging from CLI flag.
429 Checks for --log-memory CLI flag and returns logger if enabled.
431 Args:
432 log_file: Override log file path (default: auto-generate).
434 Returns:
435 MemoryLogger if enabled, None otherwise.
437 Example:
438 >>> logger = enable_memory_logging_from_cli()
439 >>> if logger:
440 ... with logger:
441 ... # Operations are logged
442 ... logger.log_operation("test")
444 References:
445 MEM-025: Memory Usage Logging
446 """
447 # Check environment variable
448 if os.environ.get("TK_LOG_MEMORY", "").lower() not in ("1", "true", "yes"):
449 return None
451 # Generate log file name if not provided
452 if log_file is None:
453 timestamp = time.strftime("%Y%m%d_%H%M%S")
454 log_file = Path(f"tracekit_memory_{timestamp}.csv")
456 return MemoryLogger(log_file, format="csv", enable_console=False)
459__all__ = [
460 "MemoryLogEntry",
461 "MemoryLogger",
462 "create_progress_callback_with_logging",
463 "enable_memory_logging_from_cli",
464 "log_memory",
465]