Coverage for src / tracekit / core / memory_progress.py: 98%

106 statements  

« prev     ^ index     » next       coverage.py v7.13.1, created at 2026-01-11 23:04 +0000

1"""Memory usage logging and progress tracking for TraceKit operations. 

2 

3This module provides detailed memory profiling logs for debugging and monitoring 

4memory usage during long-running operations. 

5 

6 

7Example: 

8 >>> from tracekit.core.memory_progress import MemoryLogger 

9 >>> logger = MemoryLogger("analysis.log", format="csv") 

10 >>> with logger: 

11 ... for i in range(1000): 

12 ... # Perform work 

13 ... logger.log_operation("fft", iteration=i) 

14 >>> stats = logger.get_summary() 

15 

16References: 

17 psutil documentation for memory monitoring 

18""" 

19 

20from __future__ import annotations 

21 

22import csv 

23import json 

24import os 

25import time 

26from contextlib import contextmanager 

27from dataclasses import asdict, dataclass 

28from pathlib import Path 

29from typing import TYPE_CHECKING, Any, Literal, TextIO 

30 

31from tracekit.utils.memory import get_available_memory, get_memory_pressure 

32 

33if TYPE_CHECKING: 

34 from collections.abc import Iterator 

35 

36 

37@dataclass 

38class MemoryLogEntry: 

39 """Single memory log entry. 

40 

41 

42 Attributes: 

43 timestamp: Time of log entry (seconds since epoch). 

44 operation: Name of operation being performed. 

45 iteration: Iteration number (if applicable). 

46 memory_used: Process memory usage (bytes). 

47 memory_peak: Peak memory since start (bytes). 

48 memory_available: System available memory (bytes). 

49 memory_pressure: Memory pressure (0.0-1.0). 

50 eta_seconds: Estimated time to completion (seconds). 

51 message: Optional descriptive message. 

52 """ 

53 

54 timestamp: float 

55 operation: str 

56 iteration: int | None 

57 memory_used: int 

58 memory_peak: int 

59 memory_available: int 

60 memory_pressure: float 

61 eta_seconds: float 

62 message: str 

63 

64 

65class MemoryLogger: 

66 """Logger for detailed memory profiling during operations. 

67 

68 

69 Logs memory usage at each operation with timestamps and metadata. 

70 Supports CSV and JSON output formats. 

71 

72 Args: 

73 log_file: Path to output log file. 

74 format: Output format ('csv' or 'json'). 

75 auto_flush: Flush after each write (default: True). 

76 enable_console: Also print to console (default: False). 

77 

78 Example: 

79 >>> logger = MemoryLogger("memory.csv", format="csv") 

80 >>> with logger: 

81 ... for i in range(100): 

82 ... # Do work 

83 ... logger.log_operation("processing", iteration=i) 

84 >>> print(logger.get_summary()) 

85 

86 References: 

87 MEM-025: Memory Usage Logging 

88 """ 

89 

90 def __init__( 

91 self, 

92 log_file: str | Path, 

93 *, 

94 format: Literal["csv", "json"] = "csv", 

95 auto_flush: bool = True, 

96 enable_console: bool = False, 

97 ): 

98 """Initialize memory logger. 

99 

100 Args: 

101 log_file: Path to log file. 

102 format: Output format ('csv' or 'json'). 

103 auto_flush: Flush after each entry. 

104 enable_console: Print to console as well. 

105 """ 

106 self.log_file = Path(log_file) 

107 self.format = format 

108 self.auto_flush = auto_flush 

109 self.enable_console = enable_console 

110 

111 # State 

112 self._entries: list[MemoryLogEntry] = [] 

113 self._file_handle: TextIO | None = None 

114 self._csv_writer: Any = None 

115 self._start_time = 0.0 

116 self._start_memory = 0 

117 self._peak_memory = 0 

118 

119 # Create directory if needed 

120 self.log_file.parent.mkdir(parents=True, exist_ok=True) 

121 

122 def __enter__(self) -> MemoryLogger: 

123 """Enter context and initialize logging.""" 

124 self._start_time = time.time() 

125 self._start_memory = self._get_process_memory() 

126 self._peak_memory = self._start_memory 

127 

128 # Open log file 

129 self._file_handle = open(self.log_file, "w", newline="") 

130 

131 # Initialize CSV writer if needed 

132 if self.format == "csv": 

133 self._csv_writer = csv.DictWriter( 

134 self._file_handle, # type: ignore[arg-type] 

135 fieldnames=[ 

136 "timestamp", 

137 "operation", 

138 "iteration", 

139 "memory_used", 

140 "memory_peak", 

141 "memory_available", 

142 "memory_pressure", 

143 "eta_seconds", 

144 "message", 

145 ], 

146 ) 

147 self._csv_writer.writeheader() # type: ignore[attr-defined] 

148 if self.auto_flush: 148 ↛ 151line 148 didn't jump to line 151 because the condition on line 148 was always true

149 self._file_handle.flush() # type: ignore[attr-defined] 

150 

151 return self 

152 

153 def __exit__(self, exc_type, exc_val, exc_tb) -> None: # type: ignore[no-untyped-def] 

154 """Exit context and finalize logging.""" 

155 # Note: exc_val and exc_tb intentionally unused but required for Python 3.11+ compatibility 

156 # Write summary for JSON format 

157 if self.format == "json" and self._file_handle: 

158 summary = { # type: ignore[unreachable] 

159 "entries": [asdict(entry) for entry in self._entries], 

160 "summary": self._get_summary_dict(), 

161 } 

162 json.dump(summary, self._file_handle, indent=2) 

163 

164 # Close file 

165 if self._file_handle: 165 ↛ exitline 165 didn't return from function '__exit__' because the condition on line 165 was always true

166 self._file_handle.close() # type: ignore[unreachable] 

167 self._file_handle = None 

168 

169 def log_operation( 

170 self, 

171 operation: str, 

172 *, 

173 iteration: int | None = None, 

174 eta_seconds: float = 0.0, 

175 message: str = "", 

176 ) -> None: 

177 """Log memory usage for an operation. 

178 

179 

180 Args: 

181 operation: Name of operation. 

182 iteration: Iteration number (optional). 

183 eta_seconds: Estimated time to completion. 

184 message: Optional descriptive message. 

185 

186 Example: 

187 >>> logger.log_operation("fft", iteration=100, eta_seconds=5.2) 

188 

189 References: 

190 MEM-025: Memory Usage Logging 

191 """ 

192 # Get memory metrics 

193 memory_used = self._get_process_memory() 

194 memory_available = get_available_memory() 

195 memory_pressure = get_memory_pressure() 

196 

197 # Update peak 

198 self._peak_memory = max(self._peak_memory, memory_used) 

199 

200 # Create entry 

201 entry = MemoryLogEntry( 

202 timestamp=time.time(), 

203 operation=operation, 

204 iteration=iteration, 

205 memory_used=memory_used, 

206 memory_peak=self._peak_memory, 

207 memory_available=memory_available, 

208 memory_pressure=memory_pressure, 

209 eta_seconds=eta_seconds, 

210 message=message, 

211 ) 

212 

213 self._entries.append(entry) 

214 

215 # Write to file 

216 if self._file_handle and self.format == "csv": # type: ignore[unreachable] 

217 self._csv_writer.writerow(asdict(entry)) # type: ignore[unreachable] 

218 if self.auto_flush: 218 ↛ 222line 218 didn't jump to line 222 because the condition on line 218 was always true

219 self._file_handle.flush() 

220 

221 # Console output 

222 if self.enable_console: 

223 print(self._format_entry(entry)) 

224 

225 def log_progress( 

226 self, 

227 operation: str, 

228 current: int, 

229 total: int, 

230 *, 

231 message: str = "", 

232 ) -> None: 

233 """Log memory usage with progress information. 

234 

235 

236 Convenience method that calculates ETA from progress. 

237 

238 Args: 

239 operation: Name of operation. 

240 current: Current progress value. 

241 total: Total progress value. 

242 message: Optional message. 

243 

244 Example: 

245 >>> for i in range(1000): 

246 ... logger.log_progress("analysis", i + 1, 1000) 

247 

248 References: 

249 MEM-024: Memory-Aware Progress Callback 

250 MEM-025: Memory Usage Logging 

251 """ 

252 # Calculate ETA 

253 elapsed = time.time() - self._start_time 

254 eta = elapsed / current * (total - current) if current > 0 else 0.0 

255 

256 self.log_operation( 

257 operation, 

258 iteration=current, 

259 eta_seconds=eta, 

260 message=message, 

261 ) 

262 

263 def get_summary(self) -> str: 

264 """Get human-readable summary of memory usage. 

265 

266 Returns: 

267 Formatted summary string. 

268 

269 Example: 

270 >>> logger = MemoryLogger("test.log") 

271 >>> with logger: 

272 ... logger.log_operation("test") 

273 >>> print(logger.get_summary()) 

274 

275 References: 

276 MEM-025: Memory Usage Logging 

277 """ 

278 summary = self._get_summary_dict() 

279 

280 return ( 

281 f"Memory Usage Summary:\n" 

282 f" Entries: {summary['entry_count']}\n" 

283 f" Duration: {summary['duration']:.2f}s\n" 

284 f" Start Memory: {summary['start_memory'] / 1e9:.2f} GB\n" 

285 f" Peak Memory: {summary['peak_memory'] / 1e9:.2f} GB\n" 

286 f" Delta: {summary['memory_delta'] / 1e9:.2f} GB\n" 

287 f" Min Available: {summary['min_available'] / 1e9:.2f} GB\n" 

288 f" Max Pressure: {summary['max_pressure'] * 100:.1f}%\n" 

289 ) 

290 

291 def get_entries(self) -> list[MemoryLogEntry]: 

292 """Get all logged entries. 

293 

294 Returns: 

295 List of memory log entries. 

296 

297 References: 

298 MEM-025: Memory Usage Logging 

299 """ 

300 return self._entries.copy() 

301 

302 def _get_summary_dict(self) -> dict: # type: ignore[type-arg] 

303 """Get summary statistics as dictionary.""" 

304 if not self._entries: 

305 return { 

306 "entry_count": 0, 

307 "duration": 0.0, 

308 "start_memory": self._start_memory, 

309 "peak_memory": self._peak_memory, 

310 "memory_delta": 0, 

311 "min_available": 0, 

312 "max_pressure": 0.0, 

313 } 

314 

315 duration = self._entries[-1].timestamp - self._entries[0].timestamp 

316 min_available = min(e.memory_available for e in self._entries) 

317 max_pressure = max(e.memory_pressure for e in self._entries) 

318 

319 return { 

320 "entry_count": len(self._entries), 

321 "duration": duration, 

322 "start_memory": self._start_memory, 

323 "peak_memory": self._peak_memory, 

324 "memory_delta": self._peak_memory - self._start_memory, 

325 "min_available": min_available, 

326 "max_pressure": max_pressure, 

327 } 

328 

329 def _format_entry(self, entry: MemoryLogEntry) -> str: 

330 """Format entry for console output.""" 

331 elapsed = entry.timestamp - self._start_time 

332 return ( 

333 f"[{elapsed:7.2f}s] {entry.operation:20s} | " 

334 f"Used: {entry.memory_used / 1e9:6.2f} GB | " 

335 f"Peak: {entry.memory_peak / 1e9:6.2f} GB | " 

336 f"Avail: {entry.memory_available / 1e9:6.2f} GB | " 

337 f"Pressure: {entry.memory_pressure * 100:5.1f}%" 

338 + (f" | {entry.message}" if entry.message else "") 

339 ) 

340 

341 def _get_process_memory(self) -> int: 

342 """Get current process memory usage in bytes.""" 

343 try: 

344 import psutil 

345 

346 process = psutil.Process() 

347 return process.memory_info().rss # type: ignore[no-any-return] 

348 except ImportError: 

349 # Fallback: estimate from system memory 

350 from tracekit.utils.memory import get_total_memory 

351 

352 return get_total_memory() - get_available_memory() 

353 

354 

355@contextmanager 

356def log_memory( 

357 log_file: str | Path, 

358 *, 

359 format: Literal["csv", "json"] = "csv", 

360 enable_console: bool = False, 

361) -> Iterator[MemoryLogger]: 

362 """Context manager for memory logging. 

363 

364 

365 Convenience function that wraps MemoryLogger. 

366 

367 Args: 

368 log_file: Path to log file. 

369 format: Output format ('csv' or 'json'). 

370 enable_console: Print to console. 

371 

372 Yields: 

373 MemoryLogger instance. 

374 

375 Example: 

376 >>> with log_memory("analysis.csv") as logger: 

377 ... for i in range(1000): 

378 ... # Do work 

379 ... logger.log_progress("processing", i + 1, 1000) 

380 

381 References: 

382 MEM-025: Memory Usage Logging 

383 """ 

384 logger = MemoryLogger(log_file, format=format, enable_console=enable_console) 

385 with logger: 

386 yield logger 

387 

388 

389def create_progress_callback_with_logging( 

390 logger: MemoryLogger, 

391 operation: str, 

392) -> callable: # type: ignore[valid-type] 

393 """Create progress callback that logs to MemoryLogger. 

394 

395 

396 Returns a callback function compatible with progress tracking APIs 

397 that automatically logs memory usage. 

398 

399 Args: 

400 logger: MemoryLogger instance to log to. 

401 operation: Name of operation. 

402 

403 Returns: 

404 Progress callback function. 

405 

406 Example: 

407 >>> logger = MemoryLogger("test.log") 

408 >>> callback = create_progress_callback_with_logging(logger, "fft") 

409 >>> callback(50, 100, "Processing") 

410 

411 References: 

412 MEM-024: Memory-Aware Progress Callback 

413 MEM-025: Memory Usage Logging 

414 """ 

415 

416 def callback(current: int, total: int, message: str) -> None: 

417 """Progress callback with memory logging.""" 

418 logger.log_progress(operation, current, total, message=message) 

419 

420 return callback 

421 

422 

423def enable_memory_logging_from_cli( 

424 log_file: str | Path | None = None, 

425) -> MemoryLogger | None: 

426 """Enable memory logging from CLI flag. 

427 

428 

429 Checks for --log-memory CLI flag and returns logger if enabled. 

430 

431 Args: 

432 log_file: Override log file path (default: auto-generate). 

433 

434 Returns: 

435 MemoryLogger if enabled, None otherwise. 

436 

437 Example: 

438 >>> logger = enable_memory_logging_from_cli() 

439 >>> if logger: 

440 ... with logger: 

441 ... # Operations are logged 

442 ... logger.log_operation("test") 

443 

444 References: 

445 MEM-025: Memory Usage Logging 

446 """ 

447 # Check environment variable 

448 if os.environ.get("TK_LOG_MEMORY", "").lower() not in ("1", "true", "yes"): 

449 return None 

450 

451 # Generate log file name if not provided 

452 if log_file is None: 

453 timestamp = time.strftime("%Y%m%d_%H%M%S") 

454 log_file = Path(f"tracekit_memory_{timestamp}.csv") 

455 

456 return MemoryLogger(log_file, format="csv", enable_console=False) 

457 

458 

459__all__ = [ 

460 "MemoryLogEntry", 

461 "MemoryLogger", 

462 "create_progress_callback_with_logging", 

463 "enable_memory_logging_from_cli", 

464 "log_memory", 

465]