Coverage for src / tracekit / core / progress.py: 100%
99 statements
« prev ^ index » next coverage.py v7.13.1, created at 2026-01-11 23:04 +0000
« prev ^ index » next coverage.py v7.13.1, created at 2026-01-11 23:04 +0000
1"""Progress tracking and cancellation support for TraceKit operations.
3This module provides progress callbacks, cancellation tokens, and memory warnings
4for long-running operations.
7Example:
8 >>> from tracekit.core.progress import ProgressCallback, CancellationToken
9 >>> token = CancellationToken()
10 >>> def progress_fn(current, total, message):
11 ... print(f"{current}/{total}: {message}")
12 >>> # Use in analysis functions
13 >>> result = analyze(data, progress_callback=progress_fn, cancel_token=token)
15References:
16 - WCAG 2.1 progress indication guidelines
17 - Python threading and multiprocessing best practices
18"""
20from __future__ import annotations
22import time
23import warnings
24from typing import TYPE_CHECKING, Protocol
26import psutil
28if TYPE_CHECKING:
29 from collections.abc import Callable
32class ProgressCallback(Protocol):
33 """Protocol for progress callback functions.
35 : Progress callback parameter on all analysis functions.
36 Callback receives (current, total, message) for progress reporting.
38 Args:
39 current: Current progress value (e.g., samples processed)
40 total: Total expected value (e.g., total samples)
41 message: Descriptive message about current operation
43 Example:
44 >>> def my_progress(current: int, total: int, message: str) -> None:
45 ... percent = 100 * current / total
46 ... print(f"{percent:.1f}%: {message}")
48 References:
49 PROG-001: Progress Indication for Long Operations
50 """
52 def __call__(self, current: int, total: int, message: str) -> None:
53 """Progress callback signature.
55 Args:
56 current: Current progress (completed items)
57 total: Total items to process
58 message: Status message
59 """
60 ...
63class CancellationToken:
64 """Token for cancelling long-running operations.
66 : Cancellation Support - cancel() method on operation handles.
67 Allows graceful cancellation of operations with Ctrl+C support.
69 Attributes:
70 cancelled: Whether cancellation has been requested
71 message: Optional cancellation message
73 Example:
74 >>> from tracekit.core.progress import CancellationToken, CancelledError
75 >>> token = CancellationToken()
76 >>> # In analysis function:
77 >>> for i in range(n_samples):
78 ... if token.is_cancelled():
79 ... raise CancelledError("Analysis cancelled by user")
80 ... # ... process sample ...
82 References:
83 PROG-002: Cancellation Support
84 """
86 def __init__(self) -> None:
87 """Initialize cancellation token."""
88 self._cancelled: bool = False
89 self._message: str = ""
90 self._cancelled_at: float | None = None
92 def cancel(self, message: str = "Operation cancelled") -> None:
93 """Request cancellation of the operation.
95 Args:
96 message: Reason for cancellation (default: "Operation cancelled")
98 Example:
99 >>> token = CancellationToken()
100 >>> token.cancel("User requested stop")
101 >>> assert token.is_cancelled()
103 References:
104 PROG-002: Cancellation Support
105 """
106 self._cancelled = True
107 self._message = message
108 self._cancelled_at = time.time()
110 def is_cancelled(self) -> bool:
111 """Check if cancellation has been requested.
113 Returns:
114 True if operation should be cancelled
116 Example:
117 >>> token = CancellationToken()
118 >>> if token.is_cancelled():
119 ... return # Exit early
121 References:
122 PROG-002: Cancellation Support
123 """
124 return self._cancelled
126 def check(self) -> None:
127 """Check cancellation status and raise if cancelled.
129 Raises:
130 CancelledError: If cancellation has been requested
132 Example:
133 >>> token = CancellationToken()
134 >>> token.cancel()
135 >>> token.check() # Raises CancelledError
137 References:
138 PROG-002: Cancellation Support
139 """
140 if self._cancelled:
141 raise CancelledError(self._message)
143 @property
144 def message(self) -> str:
145 """Get cancellation message.
147 Returns:
148 Cancellation message
150 References:
151 PROG-002: Cancellation Support
152 """
153 return self._message
155 @property
156 def cancelled_at(self) -> float | None:
157 """Get timestamp when cancellation was requested.
159 Returns:
160 Timestamp in seconds since epoch, or None if not cancelled
162 References:
163 PROG-002: Cancellation Support
164 """
165 return self._cancelled_at
168class CancelledError(Exception):
169 """Exception raised when operation is cancelled.
171 : Partial results available after cancellation.
172 Operations can catch this to save partial results before exiting.
174 Attributes:
175 message: Reason for cancellation
176 progress: Progress percentage at cancellation (0-100)
178 Example:
179 >>> from tracekit.core.progress import CancelledError
180 >>> try:
181 ... # ... long operation ...
182 ... raise CancelledError("User cancelled", progress=45.5)
183 ... except CancelledError as e:
184 ... print(f"Cancelled at {e.progress}%: {e.message}")
186 References:
187 PROG-002: Cancellation Support
188 """
190 def __init__(self, message: str, *, progress: float = 0.0) -> None:
191 """Initialize CancelledError.
193 Args:
194 message: Reason for cancellation
195 progress: Progress percentage at cancellation (default: 0.0)
196 """
197 self.message = message
198 self.progress = progress
199 super().__init__(f"{message} ({progress:.1f}% complete)")
202def create_progress_tracker(
203 total: int,
204 *,
205 callback: Callable[[int, int, str], None] | None = None,
206 update_interval: float = 0.1,
207) -> ProgressTracker:
208 """Create a progress tracker for an operation.
210 : Progress callback receives (current, total, eta_seconds).
211 Automatically calculates ETA and throttles updates.
213 Args:
214 total: Total number of items to process
215 callback: Optional progress callback function
216 update_interval: Minimum time between updates in seconds (default: 0.1)
218 Returns:
219 ProgressTracker instance
221 Example:
222 >>> from tracekit.core.progress import create_progress_tracker
223 >>> tracker = create_progress_tracker(1000, callback=my_progress)
224 >>> for i in range(1000):
225 ... tracker.update(i + 1, "Processing item")
227 References:
228 PROG-001: Progress Indication for Long Operations
229 """
230 return ProgressTracker(total, callback=callback, update_interval=update_interval)
233class ProgressTracker:
234 """Progress tracker with ETA calculation and throttling.
236 : Callback receives (current, total, eta_seconds).
237 Tracks progress and calculates estimated time to completion.
239 Args:
240 total: Total number of items
241 callback: Optional progress callback
242 update_interval: Minimum seconds between updates
244 Example:
245 >>> from tracekit.core.progress import ProgressTracker
246 >>> tracker = ProgressTracker(1000)
247 >>> for i in range(1000):
248 ... tracker.update(i + 1, "Processing")
249 >>> tracker.finish("Complete")
251 References:
252 PROG-001: Progress Indication for Long Operations
253 """
255 def __init__(
256 self,
257 total: int,
258 *,
259 callback: Callable[[int, int, str], None] | None = None,
260 update_interval: float = 0.1,
261 ) -> None:
262 """Initialize progress tracker.
264 Args:
265 total: Total items to process
266 callback: Progress callback function
267 update_interval: Minimum seconds between updates
268 """
269 self.total = total
270 self.current = 0
271 self.callback = callback
272 self.update_interval = update_interval
274 self._start_time = time.time()
275 self._last_update_time = 0.0
276 self._finished = False
278 def update(self, current: int, message: str = "") -> None:
279 """Update progress.
281 Args:
282 current: Current progress value
283 message: Status message
285 Example:
286 >>> tracker.update(500, "Halfway done")
288 References:
289 PROG-001: Progress Indication for Long Operations
290 """
291 self.current = current
293 # Throttle updates
294 now = time.time()
295 if now - self._last_update_time < self.update_interval:
296 return
298 self._last_update_time = now
300 if self.callback:
301 self.callback(current, self.total, message)
303 def get_eta(self) -> float:
304 """Calculate estimated time to completion.
306 Returns:
307 Estimated seconds remaining
309 Example:
310 >>> tracker.update(500, "Processing")
311 >>> eta = tracker.get_eta()
312 >>> print(f"ETA: {eta:.1f} seconds")
314 References:
315 PROG-001: Progress Indication for Long Operations
316 """
317 if self.current == 0:
318 return 0.0
320 elapsed = time.time() - self._start_time
321 rate = self.current / elapsed
322 remaining = self.total - self.current
324 if rate > 0:
325 return remaining / rate
326 else:
327 return 0.0
329 def get_progress_percent(self) -> float:
330 """Get progress as percentage.
332 Returns:
333 Progress percentage (0-100)
335 Example:
336 >>> tracker.update(250, "Processing")
337 >>> print(f"Progress: {tracker.get_progress_percent():.1f}%")
339 References:
340 PROG-001: Progress Indication for Long Operations
341 """
342 if self.total == 0:
343 return 100.0
344 return 100.0 * self.current / self.total
346 def finish(self, message: str = "Complete") -> None:
347 """Mark operation as finished.
349 Args:
350 message: Completion message (default: "Complete")
352 Example:
353 >>> tracker.finish("Analysis complete")
355 References:
356 PROG-001: Progress Indication for Long Operations
357 """
358 self._finished = True
359 self.current = self.total
361 if self.callback:
362 self.callback(self.total, self.total, message)
365def estimate_memory_usage(
366 n_samples: int,
367 dtype_bytes: int = 8,
368 *,
369 n_channels: int = 1,
370 scratch_multiplier: float = 2.0,
371) -> int:
372 """Estimate memory usage for an operation.
374 : Estimate memory before large FFT/spectrograms.
375 Calculates expected memory consumption including scratch space.
377 Args:
378 n_samples: Number of samples
379 dtype_bytes: Bytes per sample (default: 8 for float64)
380 n_channels: Number of channels (default: 1)
381 scratch_multiplier: Multiplier for temporary arrays (default: 2.0)
383 Returns:
384 Estimated memory usage in bytes
386 Example:
387 >>> from tracekit.core.progress import estimate_memory_usage
388 >>> memory_bytes = estimate_memory_usage(1_000_000, dtype_bytes=8)
389 >>> memory_mb = memory_bytes / (1024 ** 2)
390 >>> print(f"Estimated: {memory_mb:.1f} MB")
392 References:
393 PROG-003: Memory Usage Warnings
394 """
395 # Base array size
396 base_size = n_samples * dtype_bytes * n_channels
398 # Include scratch space for operations (e.g., FFT)
399 total_size = int(base_size * scratch_multiplier)
401 return total_size
404def check_memory_available(required_bytes: int, *, threshold: float = 0.8) -> bool:
405 """Check if sufficient memory is available.
407 : Warn if estimated > 80% of available RAM.
408 Checks system memory availability before large operations.
410 Args:
411 required_bytes: Required memory in bytes
412 threshold: Maximum fraction of available RAM to use (default: 0.8)
414 Returns:
415 True if sufficient memory is available
417 Example:
418 >>> from tracekit.core.progress import check_memory_available
419 >>> required = 1024 * 1024 * 1024 # 1 GB
420 >>> if not check_memory_available(required):
421 ... print("Warning: Insufficient memory")
423 References:
424 PROG-003: Memory Usage Warnings
425 """
426 memory = psutil.virtual_memory()
427 available_bytes = memory.available
428 threshold_bytes = available_bytes * threshold
430 return required_bytes <= threshold_bytes # type: ignore[no-any-return]
433def warn_memory_usage(
434 required_bytes: int,
435 *,
436 threshold: float = 0.8,
437 suggest_chunked: bool = True,
438) -> None:
439 """Warn if operation may exceed available memory.
441 : Warn before operations that may exceed available memory.
442 Issues warning and suggests chunked processing if needed.
444 Args:
445 required_bytes: Required memory in bytes
446 threshold: Maximum fraction of available RAM (default: 0.8)
447 suggest_chunked: Suggest chunked processing (default: True)
449 Example:
450 >>> from tracekit.core.progress import warn_memory_usage
451 >>> required = estimate_memory_usage(10_000_000)
452 >>> warn_memory_usage(required)
454 References:
455 PROG-003: Memory Usage Warnings
456 """
457 memory = psutil.virtual_memory()
458 available_bytes = memory.available
459 threshold_bytes = available_bytes * threshold
461 required_mb = required_bytes / (1024**2)
462 available_mb = available_bytes / (1024**2)
463 threshold_mb = threshold_bytes / (1024**2)
465 if required_bytes > threshold_bytes:
466 message = (
467 f"Warning: Operation may require {required_mb:.1f} MB of memory, "
468 f"but only {available_mb:.1f} MB is available "
469 f"(threshold: {threshold_mb:.1f} MB)."
470 )
472 if suggest_chunked:
473 message += " Consider using chunked processing or reducing the data size."
475 warnings.warn(message, ResourceWarning, stacklevel=2)
478def create_simple_progress(
479 message_prefix: str = "Progress",
480) -> Callable[[int, int, str], None]:
481 """Create a simple text-based progress callback.
483 : CLI shows progress bar for long operations.
484 Returns a callback that prints progress to stdout.
486 Args:
487 message_prefix: Prefix for progress messages (default: "Progress")
489 Returns:
490 Progress callback function
492 Example:
493 >>> from tracekit.core.progress import create_simple_progress
494 >>> callback = create_simple_progress("Loading")
495 >>> for i in range(100):
496 ... callback(i + 1, 100, "Processing")
498 References:
499 PROG-001: Progress Indication for Long Operations
500 """
502 def callback(current: int, total: int, message: str) -> None:
503 percent = 100 * current / total if total > 0 else 0
504 status = f"{message_prefix}: {percent:.1f}% ({current}/{total})"
505 if message:
506 status += f" - {message}"
507 print(f"\r{status}", end="", flush=True)
508 if current >= total:
509 print() # New line when complete
511 return callback
514__all__ = [
515 "CancellationToken",
516 "CancelledError",
517 "ProgressCallback",
518 "ProgressTracker",
519 "check_memory_available",
520 "create_progress_tracker",
521 "create_simple_progress",
522 "estimate_memory_usage",
523 "warn_memory_usage",
524]