Coverage for src / tracekit / core / progress.py: 100%

99 statements  

« prev     ^ index     » next       coverage.py v7.13.1, created at 2026-01-11 23:04 +0000

1"""Progress tracking and cancellation support for TraceKit operations. 

2 

3This module provides progress callbacks, cancellation tokens, and memory warnings 

4for long-running operations. 

5 

6 

7Example: 

8 >>> from tracekit.core.progress import ProgressCallback, CancellationToken 

9 >>> token = CancellationToken() 

10 >>> def progress_fn(current, total, message): 

11 ... print(f"{current}/{total}: {message}") 

12 >>> # Use in analysis functions 

13 >>> result = analyze(data, progress_callback=progress_fn, cancel_token=token) 

14 

15References: 

16 - WCAG 2.1 progress indication guidelines 

17 - Python threading and multiprocessing best practices 

18""" 

19 

20from __future__ import annotations 

21 

22import time 

23import warnings 

24from typing import TYPE_CHECKING, Protocol 

25 

26import psutil 

27 

28if TYPE_CHECKING: 

29 from collections.abc import Callable 

30 

31 

32class ProgressCallback(Protocol): 

33 """Protocol for progress callback functions. 

34 

35 : Progress callback parameter on all analysis functions. 

36 Callback receives (current, total, message) for progress reporting. 

37 

38 Args: 

39 current: Current progress value (e.g., samples processed) 

40 total: Total expected value (e.g., total samples) 

41 message: Descriptive message about current operation 

42 

43 Example: 

44 >>> def my_progress(current: int, total: int, message: str) -> None: 

45 ... percent = 100 * current / total 

46 ... print(f"{percent:.1f}%: {message}") 

47 

48 References: 

49 PROG-001: Progress Indication for Long Operations 

50 """ 

51 

52 def __call__(self, current: int, total: int, message: str) -> None: 

53 """Progress callback signature. 

54 

55 Args: 

56 current: Current progress (completed items) 

57 total: Total items to process 

58 message: Status message 

59 """ 

60 ... 

61 

62 

63class CancellationToken: 

64 """Token for cancelling long-running operations. 

65 

66 : Cancellation Support - cancel() method on operation handles. 

67 Allows graceful cancellation of operations with Ctrl+C support. 

68 

69 Attributes: 

70 cancelled: Whether cancellation has been requested 

71 message: Optional cancellation message 

72 

73 Example: 

74 >>> from tracekit.core.progress import CancellationToken, CancelledError 

75 >>> token = CancellationToken() 

76 >>> # In analysis function: 

77 >>> for i in range(n_samples): 

78 ... if token.is_cancelled(): 

79 ... raise CancelledError("Analysis cancelled by user") 

80 ... # ... process sample ... 

81 

82 References: 

83 PROG-002: Cancellation Support 

84 """ 

85 

86 def __init__(self) -> None: 

87 """Initialize cancellation token.""" 

88 self._cancelled: bool = False 

89 self._message: str = "" 

90 self._cancelled_at: float | None = None 

91 

92 def cancel(self, message: str = "Operation cancelled") -> None: 

93 """Request cancellation of the operation. 

94 

95 Args: 

96 message: Reason for cancellation (default: "Operation cancelled") 

97 

98 Example: 

99 >>> token = CancellationToken() 

100 >>> token.cancel("User requested stop") 

101 >>> assert token.is_cancelled() 

102 

103 References: 

104 PROG-002: Cancellation Support 

105 """ 

106 self._cancelled = True 

107 self._message = message 

108 self._cancelled_at = time.time() 

109 

110 def is_cancelled(self) -> bool: 

111 """Check if cancellation has been requested. 

112 

113 Returns: 

114 True if operation should be cancelled 

115 

116 Example: 

117 >>> token = CancellationToken() 

118 >>> if token.is_cancelled(): 

119 ... return # Exit early 

120 

121 References: 

122 PROG-002: Cancellation Support 

123 """ 

124 return self._cancelled 

125 

126 def check(self) -> None: 

127 """Check cancellation status and raise if cancelled. 

128 

129 Raises: 

130 CancelledError: If cancellation has been requested 

131 

132 Example: 

133 >>> token = CancellationToken() 

134 >>> token.cancel() 

135 >>> token.check() # Raises CancelledError 

136 

137 References: 

138 PROG-002: Cancellation Support 

139 """ 

140 if self._cancelled: 

141 raise CancelledError(self._message) 

142 

143 @property 

144 def message(self) -> str: 

145 """Get cancellation message. 

146 

147 Returns: 

148 Cancellation message 

149 

150 References: 

151 PROG-002: Cancellation Support 

152 """ 

153 return self._message 

154 

155 @property 

156 def cancelled_at(self) -> float | None: 

157 """Get timestamp when cancellation was requested. 

158 

159 Returns: 

160 Timestamp in seconds since epoch, or None if not cancelled 

161 

162 References: 

163 PROG-002: Cancellation Support 

164 """ 

165 return self._cancelled_at 

166 

167 

168class CancelledError(Exception): 

169 """Exception raised when operation is cancelled. 

170 

171 : Partial results available after cancellation. 

172 Operations can catch this to save partial results before exiting. 

173 

174 Attributes: 

175 message: Reason for cancellation 

176 progress: Progress percentage at cancellation (0-100) 

177 

178 Example: 

179 >>> from tracekit.core.progress import CancelledError 

180 >>> try: 

181 ... # ... long operation ... 

182 ... raise CancelledError("User cancelled", progress=45.5) 

183 ... except CancelledError as e: 

184 ... print(f"Cancelled at {e.progress}%: {e.message}") 

185 

186 References: 

187 PROG-002: Cancellation Support 

188 """ 

189 

190 def __init__(self, message: str, *, progress: float = 0.0) -> None: 

191 """Initialize CancelledError. 

192 

193 Args: 

194 message: Reason for cancellation 

195 progress: Progress percentage at cancellation (default: 0.0) 

196 """ 

197 self.message = message 

198 self.progress = progress 

199 super().__init__(f"{message} ({progress:.1f}% complete)") 

200 

201 

202def create_progress_tracker( 

203 total: int, 

204 *, 

205 callback: Callable[[int, int, str], None] | None = None, 

206 update_interval: float = 0.1, 

207) -> ProgressTracker: 

208 """Create a progress tracker for an operation. 

209 

210 : Progress callback receives (current, total, eta_seconds). 

211 Automatically calculates ETA and throttles updates. 

212 

213 Args: 

214 total: Total number of items to process 

215 callback: Optional progress callback function 

216 update_interval: Minimum time between updates in seconds (default: 0.1) 

217 

218 Returns: 

219 ProgressTracker instance 

220 

221 Example: 

222 >>> from tracekit.core.progress import create_progress_tracker 

223 >>> tracker = create_progress_tracker(1000, callback=my_progress) 

224 >>> for i in range(1000): 

225 ... tracker.update(i + 1, "Processing item") 

226 

227 References: 

228 PROG-001: Progress Indication for Long Operations 

229 """ 

230 return ProgressTracker(total, callback=callback, update_interval=update_interval) 

231 

232 

233class ProgressTracker: 

234 """Progress tracker with ETA calculation and throttling. 

235 

236 : Callback receives (current, total, eta_seconds). 

237 Tracks progress and calculates estimated time to completion. 

238 

239 Args: 

240 total: Total number of items 

241 callback: Optional progress callback 

242 update_interval: Minimum seconds between updates 

243 

244 Example: 

245 >>> from tracekit.core.progress import ProgressTracker 

246 >>> tracker = ProgressTracker(1000) 

247 >>> for i in range(1000): 

248 ... tracker.update(i + 1, "Processing") 

249 >>> tracker.finish("Complete") 

250 

251 References: 

252 PROG-001: Progress Indication for Long Operations 

253 """ 

254 

255 def __init__( 

256 self, 

257 total: int, 

258 *, 

259 callback: Callable[[int, int, str], None] | None = None, 

260 update_interval: float = 0.1, 

261 ) -> None: 

262 """Initialize progress tracker. 

263 

264 Args: 

265 total: Total items to process 

266 callback: Progress callback function 

267 update_interval: Minimum seconds between updates 

268 """ 

269 self.total = total 

270 self.current = 0 

271 self.callback = callback 

272 self.update_interval = update_interval 

273 

274 self._start_time = time.time() 

275 self._last_update_time = 0.0 

276 self._finished = False 

277 

278 def update(self, current: int, message: str = "") -> None: 

279 """Update progress. 

280 

281 Args: 

282 current: Current progress value 

283 message: Status message 

284 

285 Example: 

286 >>> tracker.update(500, "Halfway done") 

287 

288 References: 

289 PROG-001: Progress Indication for Long Operations 

290 """ 

291 self.current = current 

292 

293 # Throttle updates 

294 now = time.time() 

295 if now - self._last_update_time < self.update_interval: 

296 return 

297 

298 self._last_update_time = now 

299 

300 if self.callback: 

301 self.callback(current, self.total, message) 

302 

303 def get_eta(self) -> float: 

304 """Calculate estimated time to completion. 

305 

306 Returns: 

307 Estimated seconds remaining 

308 

309 Example: 

310 >>> tracker.update(500, "Processing") 

311 >>> eta = tracker.get_eta() 

312 >>> print(f"ETA: {eta:.1f} seconds") 

313 

314 References: 

315 PROG-001: Progress Indication for Long Operations 

316 """ 

317 if self.current == 0: 

318 return 0.0 

319 

320 elapsed = time.time() - self._start_time 

321 rate = self.current / elapsed 

322 remaining = self.total - self.current 

323 

324 if rate > 0: 

325 return remaining / rate 

326 else: 

327 return 0.0 

328 

329 def get_progress_percent(self) -> float: 

330 """Get progress as percentage. 

331 

332 Returns: 

333 Progress percentage (0-100) 

334 

335 Example: 

336 >>> tracker.update(250, "Processing") 

337 >>> print(f"Progress: {tracker.get_progress_percent():.1f}%") 

338 

339 References: 

340 PROG-001: Progress Indication for Long Operations 

341 """ 

342 if self.total == 0: 

343 return 100.0 

344 return 100.0 * self.current / self.total 

345 

346 def finish(self, message: str = "Complete") -> None: 

347 """Mark operation as finished. 

348 

349 Args: 

350 message: Completion message (default: "Complete") 

351 

352 Example: 

353 >>> tracker.finish("Analysis complete") 

354 

355 References: 

356 PROG-001: Progress Indication for Long Operations 

357 """ 

358 self._finished = True 

359 self.current = self.total 

360 

361 if self.callback: 

362 self.callback(self.total, self.total, message) 

363 

364 

365def estimate_memory_usage( 

366 n_samples: int, 

367 dtype_bytes: int = 8, 

368 *, 

369 n_channels: int = 1, 

370 scratch_multiplier: float = 2.0, 

371) -> int: 

372 """Estimate memory usage for an operation. 

373 

374 : Estimate memory before large FFT/spectrograms. 

375 Calculates expected memory consumption including scratch space. 

376 

377 Args: 

378 n_samples: Number of samples 

379 dtype_bytes: Bytes per sample (default: 8 for float64) 

380 n_channels: Number of channels (default: 1) 

381 scratch_multiplier: Multiplier for temporary arrays (default: 2.0) 

382 

383 Returns: 

384 Estimated memory usage in bytes 

385 

386 Example: 

387 >>> from tracekit.core.progress import estimate_memory_usage 

388 >>> memory_bytes = estimate_memory_usage(1_000_000, dtype_bytes=8) 

389 >>> memory_mb = memory_bytes / (1024 ** 2) 

390 >>> print(f"Estimated: {memory_mb:.1f} MB") 

391 

392 References: 

393 PROG-003: Memory Usage Warnings 

394 """ 

395 # Base array size 

396 base_size = n_samples * dtype_bytes * n_channels 

397 

398 # Include scratch space for operations (e.g., FFT) 

399 total_size = int(base_size * scratch_multiplier) 

400 

401 return total_size 

402 

403 

404def check_memory_available(required_bytes: int, *, threshold: float = 0.8) -> bool: 

405 """Check if sufficient memory is available. 

406 

407 : Warn if estimated > 80% of available RAM. 

408 Checks system memory availability before large operations. 

409 

410 Args: 

411 required_bytes: Required memory in bytes 

412 threshold: Maximum fraction of available RAM to use (default: 0.8) 

413 

414 Returns: 

415 True if sufficient memory is available 

416 

417 Example: 

418 >>> from tracekit.core.progress import check_memory_available 

419 >>> required = 1024 * 1024 * 1024 # 1 GB 

420 >>> if not check_memory_available(required): 

421 ... print("Warning: Insufficient memory") 

422 

423 References: 

424 PROG-003: Memory Usage Warnings 

425 """ 

426 memory = psutil.virtual_memory() 

427 available_bytes = memory.available 

428 threshold_bytes = available_bytes * threshold 

429 

430 return required_bytes <= threshold_bytes # type: ignore[no-any-return] 

431 

432 

433def warn_memory_usage( 

434 required_bytes: int, 

435 *, 

436 threshold: float = 0.8, 

437 suggest_chunked: bool = True, 

438) -> None: 

439 """Warn if operation may exceed available memory. 

440 

441 : Warn before operations that may exceed available memory. 

442 Issues warning and suggests chunked processing if needed. 

443 

444 Args: 

445 required_bytes: Required memory in bytes 

446 threshold: Maximum fraction of available RAM (default: 0.8) 

447 suggest_chunked: Suggest chunked processing (default: True) 

448 

449 Example: 

450 >>> from tracekit.core.progress import warn_memory_usage 

451 >>> required = estimate_memory_usage(10_000_000) 

452 >>> warn_memory_usage(required) 

453 

454 References: 

455 PROG-003: Memory Usage Warnings 

456 """ 

457 memory = psutil.virtual_memory() 

458 available_bytes = memory.available 

459 threshold_bytes = available_bytes * threshold 

460 

461 required_mb = required_bytes / (1024**2) 

462 available_mb = available_bytes / (1024**2) 

463 threshold_mb = threshold_bytes / (1024**2) 

464 

465 if required_bytes > threshold_bytes: 

466 message = ( 

467 f"Warning: Operation may require {required_mb:.1f} MB of memory, " 

468 f"but only {available_mb:.1f} MB is available " 

469 f"(threshold: {threshold_mb:.1f} MB)." 

470 ) 

471 

472 if suggest_chunked: 

473 message += " Consider using chunked processing or reducing the data size." 

474 

475 warnings.warn(message, ResourceWarning, stacklevel=2) 

476 

477 

478def create_simple_progress( 

479 message_prefix: str = "Progress", 

480) -> Callable[[int, int, str], None]: 

481 """Create a simple text-based progress callback. 

482 

483 : CLI shows progress bar for long operations. 

484 Returns a callback that prints progress to stdout. 

485 

486 Args: 

487 message_prefix: Prefix for progress messages (default: "Progress") 

488 

489 Returns: 

490 Progress callback function 

491 

492 Example: 

493 >>> from tracekit.core.progress import create_simple_progress 

494 >>> callback = create_simple_progress("Loading") 

495 >>> for i in range(100): 

496 ... callback(i + 1, 100, "Processing") 

497 

498 References: 

499 PROG-001: Progress Indication for Long Operations 

500 """ 

501 

502 def callback(current: int, total: int, message: str) -> None: 

503 percent = 100 * current / total if total > 0 else 0 

504 status = f"{message_prefix}: {percent:.1f}% ({current}/{total})" 

505 if message: 

506 status += f" - {message}" 

507 print(f"\r{status}", end="", flush=True) 

508 if current >= total: 

509 print() # New line when complete 

510 

511 return callback 

512 

513 

514__all__ = [ 

515 "CancellationToken", 

516 "CancelledError", 

517 "ProgressCallback", 

518 "ProgressTracker", 

519 "check_memory_available", 

520 "create_progress_tracker", 

521 "create_simple_progress", 

522 "estimate_memory_usage", 

523 "warn_memory_usage", 

524]