Coverage for src / tracekit / core / cancellation.py: 100%
99 statements
« prev ^ index » next coverage.py v7.13.1, created at 2026-01-11 23:04 +0000
« prev ^ index » next coverage.py v7.13.1, created at 2026-01-11 23:04 +0000
1"""Enhanced cancellation support for TraceKit operations.
3This module provides advanced cancellation features including signal handling,
4cleanup routines, and resume support for long-running operations.
7Example:
8 >>> from tracekit.core.cancellation import CancellationManager
9 >>> manager = CancellationManager()
10 >>> with manager.cancellable_operation("Loading data"):
11 ... # ... long operation ...
12 ... manager.check_cancelled()
14References:
15 - Python threading best practices
16 - Signal handling patterns
17"""
19from __future__ import annotations
21import atexit
22import signal
23import threading
24import time
25from contextlib import contextmanager
26from typing import TYPE_CHECKING, Any
28if TYPE_CHECKING:
29 from collections.abc import Callable, Generator
32class CancellationManager:
33 """Manager for cancellable operations with cleanup support.
35 : Ctrl+C handling, cleanup, and resume support.
36 Provides graceful cancellation with automatic cleanup and signal handling.
38 Args:
39 cleanup_callback: Optional callback for cleanup on cancellation
40 auto_cleanup: Automatically cleanup on exit (default: True)
42 Example:
43 >>> from tracekit.core.cancellation import CancellationManager
44 >>> def cleanup():
45 ... print("Cleaning up...")
46 >>> manager = CancellationManager(cleanup_callback=cleanup)
47 >>> manager.register_signal_handlers()
48 >>> # Press Ctrl+C to trigger cancellation
50 References:
51 PROG-002: Cancellation Support
52 """
54 def __init__(
55 self,
56 *,
57 cleanup_callback: Callable[[], None] | None = None,
58 auto_cleanup: bool = True,
59 ) -> None:
60 """Initialize cancellation manager.
62 Args:
63 cleanup_callback: Function to call on cancellation
64 auto_cleanup: Register cleanup at exit
65 """
66 self._cancelled = threading.Event()
67 self._cleanup_callback = cleanup_callback
68 self._cleanup_functions: list[Callable[[], None]] = []
69 self._partial_results: dict[str, Any] = {}
70 self._operation_name = ""
71 self._start_time = 0.0
72 self._signal_handlers_registered = False
74 if auto_cleanup:
75 atexit.register(self._cleanup)
77 def register_signal_handlers(self) -> None:
78 """Register signal handlers for Ctrl+C and SIGTERM.
80 : Ctrl+C handling - graceful cancellation.
81 Catches interrupt signals and triggers cancellation.
83 Example:
84 >>> manager.register_signal_handlers()
85 >>> # Now Ctrl+C will trigger cancellation
87 References:
88 PROG-002: Ctrl+C handling
89 """
90 if self._signal_handlers_registered:
91 return
93 def signal_handler(signum: int, frame: Any) -> None:
94 self.cancel(f"Received signal {signum}")
96 signal.signal(signal.SIGINT, signal_handler)
97 signal.signal(signal.SIGTERM, signal_handler)
98 self._signal_handlers_registered = True
100 def cancel(self, reason: str = "Operation cancelled") -> None:
101 """Request cancellation of the operation.
103 : cancel() method on operation handles.
105 Args:
106 reason: Reason for cancellation
108 Example:
109 >>> manager.cancel("User requested stop")
111 References:
112 PROG-002: cancel() method on operation handles
113 """
114 self._cancelled.set()
115 self._operation_name = reason
117 def is_cancelled(self) -> bool:
118 """Check if cancellation has been requested.
120 Returns:
121 True if operation should be cancelled
123 Example:
124 >>> if manager.is_cancelled():
125 ... return # Exit early
127 References:
128 PROG-002: Cancellation Support
129 """
130 return self._cancelled.is_set()
132 def check_cancelled(self) -> None:
133 """Check cancellation status and raise if cancelled.
135 : Graceful cancellation with partial results.
137 Raises:
138 CancelledException: If cancellation has been requested
140 Example:
141 >>> manager.check_cancelled() # Raises if cancelled
143 References:
144 PROG-002: Cancellation Support
145 """
146 if self._cancelled.is_set():
147 self._cleanup()
148 elapsed = time.time() - self._start_time if self._start_time > 0 else 0
149 raise CancelledException(
150 self._operation_name,
151 partial_results=self._partial_results,
152 elapsed_time=elapsed,
153 )
155 def add_cleanup(self, cleanup_fn: Callable[[], None]) -> None:
156 """Add a cleanup function to be called on cancellation.
158 : Cleanup on cancellation - no partial files.
160 Args:
161 cleanup_fn: Function to call for cleanup
163 Example:
164 >>> def cleanup_temp_files():
165 ... os.remove("temp.dat")
166 >>> manager.add_cleanup(cleanup_temp_files)
168 References:
169 PROG-002: Cleanup on cancellation
170 """
171 self._cleanup_functions.append(cleanup_fn)
173 def store_partial_result(self, key: str, value: Any) -> None:
174 """Store partial result for retrieval after cancellation.
176 : Partial results available after cancellation.
178 Args:
179 key: Result identifier
180 value: Partial result value
182 Example:
183 >>> manager.store_partial_result("samples_processed", 1000)
185 References:
186 PROG-002: Partial results available after cancellation
187 """
188 self._partial_results[key] = value
190 def get_partial_results(self) -> dict[str, Any]:
191 """Get partial results collected before cancellation.
193 Returns:
194 Dictionary of partial results
196 Example:
197 >>> try:
198 ... # ... operation ...
199 ... except CancelledException as e:
200 ... results = manager.get_partial_results()
202 References:
203 PROG-002: Partial results available after cancellation
204 """
205 return self._partial_results.copy()
207 def _cleanup(self) -> None:
208 """Execute all registered cleanup functions.
210 References:
211 PROG-002: Cleanup on cancellation
212 """
213 # Call user-provided cleanup
214 if self._cleanup_callback is not None:
215 try:
216 self._cleanup_callback()
217 except Exception:
218 pass # Ignore cleanup errors
220 # Call registered cleanup functions
221 for cleanup_fn in self._cleanup_functions:
222 try:
223 cleanup_fn()
224 except Exception:
225 pass # Ignore cleanup errors
227 @contextmanager
228 def cancellable_operation(
229 self,
230 name: str = "Operation",
231 ) -> Generator[CancellationManager, None, None]:
232 """Context manager for cancellable operations.
234 : Graceful cancellation with cleanup.
236 Args:
237 name: Operation name for logging
239 Yields:
240 CancellationManager instance
242 Raises:
243 CancelledException: If operation is cancelled or interrupted.
245 Example:
246 >>> with manager.cancellable_operation("Loading data") as ctx:
247 ... for i in range(1000):
248 ... ctx.check_cancelled()
249 ... # ... process ...
251 References:
252 PROG-002: Cancellation Support
253 """
254 self._operation_name = name
255 self._start_time = time.time()
256 try:
257 yield self
258 except CancelledException:
259 raise
260 except KeyboardInterrupt:
261 self.cancel("Interrupted by user (Ctrl+C)")
262 self._cleanup()
263 raise CancelledException( # noqa: B904
264 f"{name} interrupted by user",
265 partial_results=self._partial_results,
266 elapsed_time=time.time() - self._start_time,
267 )
268 finally:
269 if self._cancelled.is_set():
270 self._cleanup()
273class CancelledException(Exception):
274 """Exception raised when operation is cancelled.
276 : Partial results available after cancellation.
278 Attributes:
279 message: Cancellation message
280 partial_results: Results collected before cancellation
281 elapsed_time: Time elapsed before cancellation
283 Example:
284 >>> try:
285 ... manager.check_cancelled()
286 ... except CancelledException as e:
287 ... print(f"Cancelled after {e.elapsed_time:.1f}s")
288 ... print(f"Partial results: {e.partial_results}")
290 References:
291 PROG-002: Partial results available after cancellation
292 """
294 def __init__(
295 self,
296 message: str,
297 *,
298 partial_results: dict[str, Any] | None = None,
299 elapsed_time: float = 0.0,
300 ) -> None:
301 """Initialize CancelledException.
303 Args:
304 message: Cancellation message
305 partial_results: Partial results dictionary
306 elapsed_time: Elapsed time in seconds
307 """
308 self.message = message
309 self.partial_results = partial_results or {}
310 self.elapsed_time = elapsed_time
311 super().__init__(
312 f"{message} (elapsed: {elapsed_time:.1f}s, "
313 f"partial results: {len(self.partial_results)} items)"
314 )
317class ResumableOperation:
318 """Support for resumable operations after cancellation.
320 : Resume support where possible.
322 Args:
323 checkpoint_callback: Function to save checkpoint state
324 restore_callback: Function to restore from checkpoint
326 Example:
327 >>> def save_state(state):
328 ... with open("checkpoint.json", "w") as f:
329 ... json.dump(state, f)
330 >>> def load_state():
331 ... with open("checkpoint.json") as f:
332 ... return json.load(f)
333 >>> op = ResumableOperation(save_state, load_state)
335 References:
336 PROG-002: Resume support where possible
337 """
339 def __init__(
340 self,
341 checkpoint_callback: Callable[[dict], None], # type: ignore[type-arg]
342 restore_callback: Callable[[], dict], # type: ignore[type-arg]
343 ) -> None:
344 """Initialize resumable operation.
346 Args:
347 checkpoint_callback: Function to save state
348 restore_callback: Function to restore state
349 """
350 self._checkpoint_callback = checkpoint_callback
351 self._restore_callback = restore_callback
352 self._state: dict[str, Any] = {}
354 def checkpoint(self, state: dict[str, Any]) -> None:
355 """Save operation state for resume.
357 Args:
358 state: Current operation state
360 Example:
361 >>> op.checkpoint({"processed": 500, "total": 1000})
363 References:
364 PROG-002: Resume support
365 """
366 self._state = state
367 self._checkpoint_callback(state)
369 def restore(self) -> dict[str, Any]:
370 """Restore operation state from checkpoint.
372 Returns:
373 Restored state dictionary
375 Example:
376 >>> state = op.restore()
377 >>> start_index = state.get("processed", 0)
379 References:
380 PROG-002: Resume support
381 """
382 self._state = self._restore_callback()
383 return self._state
385 def has_checkpoint(self) -> bool:
386 """Check if checkpoint exists.
388 Returns:
389 True if checkpoint is available
391 References:
392 PROG-002: Resume support
393 """
394 try:
395 self._restore_callback()
396 return True
397 except Exception:
398 return False
401def confirm_cancellation(
402 operation_name: str = "operation",
403 *,
404 destructive: bool = False,
405) -> bool:
406 """Confirm cancellation for destructive operations.
408 : Cancel confirmation for destructive operations.
410 Args:
411 operation_name: Name of operation to cancel
412 destructive: Whether operation is destructive
414 Returns:
415 True if user confirms cancellation
417 Example:
418 >>> if confirm_cancellation("Delete files", destructive=True):
419 ... # Proceed with cancellation
421 References:
422 PROG-002: Cancel confirmation for destructive operations
423 """
424 if not destructive:
425 return True
427 try:
428 response = input(f"Cancel {operation_name}? This may lose data. [y/N]: ").strip().lower()
429 return response in ("y", "yes")
430 except (EOFError, KeyboardInterrupt):
431 return True # Assume yes on Ctrl+C during prompt
434__all__ = [
435 "CancellationManager",
436 "CancelledException",
437 "ResumableOperation",
438 "confirm_cancellation",
439]