Coverage for src / tracekit / core / cancellation.py: 100%

99 statements  

« prev     ^ index     » next       coverage.py v7.13.1, created at 2026-01-11 23:04 +0000

1"""Enhanced cancellation support for TraceKit operations. 

2 

3This module provides advanced cancellation features including signal handling, 

4cleanup routines, and resume support for long-running operations. 

5 

6 

7Example: 

8 >>> from tracekit.core.cancellation import CancellationManager 

9 >>> manager = CancellationManager() 

10 >>> with manager.cancellable_operation("Loading data"): 

11 ... # ... long operation ... 

12 ... manager.check_cancelled() 

13 

14References: 

15 - Python threading best practices 

16 - Signal handling patterns 

17""" 

18 

19from __future__ import annotations 

20 

21import atexit 

22import signal 

23import threading 

24import time 

25from contextlib import contextmanager 

26from typing import TYPE_CHECKING, Any 

27 

28if TYPE_CHECKING: 

29 from collections.abc import Callable, Generator 

30 

31 

32class CancellationManager: 

33 """Manager for cancellable operations with cleanup support. 

34 

35 : Ctrl+C handling, cleanup, and resume support. 

36 Provides graceful cancellation with automatic cleanup and signal handling. 

37 

38 Args: 

39 cleanup_callback: Optional callback for cleanup on cancellation 

40 auto_cleanup: Automatically cleanup on exit (default: True) 

41 

42 Example: 

43 >>> from tracekit.core.cancellation import CancellationManager 

44 >>> def cleanup(): 

45 ... print("Cleaning up...") 

46 >>> manager = CancellationManager(cleanup_callback=cleanup) 

47 >>> manager.register_signal_handlers() 

48 >>> # Press Ctrl+C to trigger cancellation 

49 

50 References: 

51 PROG-002: Cancellation Support 

52 """ 

53 

54 def __init__( 

55 self, 

56 *, 

57 cleanup_callback: Callable[[], None] | None = None, 

58 auto_cleanup: bool = True, 

59 ) -> None: 

60 """Initialize cancellation manager. 

61 

62 Args: 

63 cleanup_callback: Function to call on cancellation 

64 auto_cleanup: Register cleanup at exit 

65 """ 

66 self._cancelled = threading.Event() 

67 self._cleanup_callback = cleanup_callback 

68 self._cleanup_functions: list[Callable[[], None]] = [] 

69 self._partial_results: dict[str, Any] = {} 

70 self._operation_name = "" 

71 self._start_time = 0.0 

72 self._signal_handlers_registered = False 

73 

74 if auto_cleanup: 

75 atexit.register(self._cleanup) 

76 

77 def register_signal_handlers(self) -> None: 

78 """Register signal handlers for Ctrl+C and SIGTERM. 

79 

80 : Ctrl+C handling - graceful cancellation. 

81 Catches interrupt signals and triggers cancellation. 

82 

83 Example: 

84 >>> manager.register_signal_handlers() 

85 >>> # Now Ctrl+C will trigger cancellation 

86 

87 References: 

88 PROG-002: Ctrl+C handling 

89 """ 

90 if self._signal_handlers_registered: 

91 return 

92 

93 def signal_handler(signum: int, frame: Any) -> None: 

94 self.cancel(f"Received signal {signum}") 

95 

96 signal.signal(signal.SIGINT, signal_handler) 

97 signal.signal(signal.SIGTERM, signal_handler) 

98 self._signal_handlers_registered = True 

99 

100 def cancel(self, reason: str = "Operation cancelled") -> None: 

101 """Request cancellation of the operation. 

102 

103 : cancel() method on operation handles. 

104 

105 Args: 

106 reason: Reason for cancellation 

107 

108 Example: 

109 >>> manager.cancel("User requested stop") 

110 

111 References: 

112 PROG-002: cancel() method on operation handles 

113 """ 

114 self._cancelled.set() 

115 self._operation_name = reason 

116 

117 def is_cancelled(self) -> bool: 

118 """Check if cancellation has been requested. 

119 

120 Returns: 

121 True if operation should be cancelled 

122 

123 Example: 

124 >>> if manager.is_cancelled(): 

125 ... return # Exit early 

126 

127 References: 

128 PROG-002: Cancellation Support 

129 """ 

130 return self._cancelled.is_set() 

131 

132 def check_cancelled(self) -> None: 

133 """Check cancellation status and raise if cancelled. 

134 

135 : Graceful cancellation with partial results. 

136 

137 Raises: 

138 CancelledException: If cancellation has been requested 

139 

140 Example: 

141 >>> manager.check_cancelled() # Raises if cancelled 

142 

143 References: 

144 PROG-002: Cancellation Support 

145 """ 

146 if self._cancelled.is_set(): 

147 self._cleanup() 

148 elapsed = time.time() - self._start_time if self._start_time > 0 else 0 

149 raise CancelledException( 

150 self._operation_name, 

151 partial_results=self._partial_results, 

152 elapsed_time=elapsed, 

153 ) 

154 

155 def add_cleanup(self, cleanup_fn: Callable[[], None]) -> None: 

156 """Add a cleanup function to be called on cancellation. 

157 

158 : Cleanup on cancellation - no partial files. 

159 

160 Args: 

161 cleanup_fn: Function to call for cleanup 

162 

163 Example: 

164 >>> def cleanup_temp_files(): 

165 ... os.remove("temp.dat") 

166 >>> manager.add_cleanup(cleanup_temp_files) 

167 

168 References: 

169 PROG-002: Cleanup on cancellation 

170 """ 

171 self._cleanup_functions.append(cleanup_fn) 

172 

173 def store_partial_result(self, key: str, value: Any) -> None: 

174 """Store partial result for retrieval after cancellation. 

175 

176 : Partial results available after cancellation. 

177 

178 Args: 

179 key: Result identifier 

180 value: Partial result value 

181 

182 Example: 

183 >>> manager.store_partial_result("samples_processed", 1000) 

184 

185 References: 

186 PROG-002: Partial results available after cancellation 

187 """ 

188 self._partial_results[key] = value 

189 

190 def get_partial_results(self) -> dict[str, Any]: 

191 """Get partial results collected before cancellation. 

192 

193 Returns: 

194 Dictionary of partial results 

195 

196 Example: 

197 >>> try: 

198 ... # ... operation ... 

199 ... except CancelledException as e: 

200 ... results = manager.get_partial_results() 

201 

202 References: 

203 PROG-002: Partial results available after cancellation 

204 """ 

205 return self._partial_results.copy() 

206 

207 def _cleanup(self) -> None: 

208 """Execute all registered cleanup functions. 

209 

210 References: 

211 PROG-002: Cleanup on cancellation 

212 """ 

213 # Call user-provided cleanup 

214 if self._cleanup_callback is not None: 

215 try: 

216 self._cleanup_callback() 

217 except Exception: 

218 pass # Ignore cleanup errors 

219 

220 # Call registered cleanup functions 

221 for cleanup_fn in self._cleanup_functions: 

222 try: 

223 cleanup_fn() 

224 except Exception: 

225 pass # Ignore cleanup errors 

226 

227 @contextmanager 

228 def cancellable_operation( 

229 self, 

230 name: str = "Operation", 

231 ) -> Generator[CancellationManager, None, None]: 

232 """Context manager for cancellable operations. 

233 

234 : Graceful cancellation with cleanup. 

235 

236 Args: 

237 name: Operation name for logging 

238 

239 Yields: 

240 CancellationManager instance 

241 

242 Raises: 

243 CancelledException: If operation is cancelled or interrupted. 

244 

245 Example: 

246 >>> with manager.cancellable_operation("Loading data") as ctx: 

247 ... for i in range(1000): 

248 ... ctx.check_cancelled() 

249 ... # ... process ... 

250 

251 References: 

252 PROG-002: Cancellation Support 

253 """ 

254 self._operation_name = name 

255 self._start_time = time.time() 

256 try: 

257 yield self 

258 except CancelledException: 

259 raise 

260 except KeyboardInterrupt: 

261 self.cancel("Interrupted by user (Ctrl+C)") 

262 self._cleanup() 

263 raise CancelledException( # noqa: B904 

264 f"{name} interrupted by user", 

265 partial_results=self._partial_results, 

266 elapsed_time=time.time() - self._start_time, 

267 ) 

268 finally: 

269 if self._cancelled.is_set(): 

270 self._cleanup() 

271 

272 

273class CancelledException(Exception): 

274 """Exception raised when operation is cancelled. 

275 

276 : Partial results available after cancellation. 

277 

278 Attributes: 

279 message: Cancellation message 

280 partial_results: Results collected before cancellation 

281 elapsed_time: Time elapsed before cancellation 

282 

283 Example: 

284 >>> try: 

285 ... manager.check_cancelled() 

286 ... except CancelledException as e: 

287 ... print(f"Cancelled after {e.elapsed_time:.1f}s") 

288 ... print(f"Partial results: {e.partial_results}") 

289 

290 References: 

291 PROG-002: Partial results available after cancellation 

292 """ 

293 

294 def __init__( 

295 self, 

296 message: str, 

297 *, 

298 partial_results: dict[str, Any] | None = None, 

299 elapsed_time: float = 0.0, 

300 ) -> None: 

301 """Initialize CancelledException. 

302 

303 Args: 

304 message: Cancellation message 

305 partial_results: Partial results dictionary 

306 elapsed_time: Elapsed time in seconds 

307 """ 

308 self.message = message 

309 self.partial_results = partial_results or {} 

310 self.elapsed_time = elapsed_time 

311 super().__init__( 

312 f"{message} (elapsed: {elapsed_time:.1f}s, " 

313 f"partial results: {len(self.partial_results)} items)" 

314 ) 

315 

316 

317class ResumableOperation: 

318 """Support for resumable operations after cancellation. 

319 

320 : Resume support where possible. 

321 

322 Args: 

323 checkpoint_callback: Function to save checkpoint state 

324 restore_callback: Function to restore from checkpoint 

325 

326 Example: 

327 >>> def save_state(state): 

328 ... with open("checkpoint.json", "w") as f: 

329 ... json.dump(state, f) 

330 >>> def load_state(): 

331 ... with open("checkpoint.json") as f: 

332 ... return json.load(f) 

333 >>> op = ResumableOperation(save_state, load_state) 

334 

335 References: 

336 PROG-002: Resume support where possible 

337 """ 

338 

339 def __init__( 

340 self, 

341 checkpoint_callback: Callable[[dict], None], # type: ignore[type-arg] 

342 restore_callback: Callable[[], dict], # type: ignore[type-arg] 

343 ) -> None: 

344 """Initialize resumable operation. 

345 

346 Args: 

347 checkpoint_callback: Function to save state 

348 restore_callback: Function to restore state 

349 """ 

350 self._checkpoint_callback = checkpoint_callback 

351 self._restore_callback = restore_callback 

352 self._state: dict[str, Any] = {} 

353 

354 def checkpoint(self, state: dict[str, Any]) -> None: 

355 """Save operation state for resume. 

356 

357 Args: 

358 state: Current operation state 

359 

360 Example: 

361 >>> op.checkpoint({"processed": 500, "total": 1000}) 

362 

363 References: 

364 PROG-002: Resume support 

365 """ 

366 self._state = state 

367 self._checkpoint_callback(state) 

368 

369 def restore(self) -> dict[str, Any]: 

370 """Restore operation state from checkpoint. 

371 

372 Returns: 

373 Restored state dictionary 

374 

375 Example: 

376 >>> state = op.restore() 

377 >>> start_index = state.get("processed", 0) 

378 

379 References: 

380 PROG-002: Resume support 

381 """ 

382 self._state = self._restore_callback() 

383 return self._state 

384 

385 def has_checkpoint(self) -> bool: 

386 """Check if checkpoint exists. 

387 

388 Returns: 

389 True if checkpoint is available 

390 

391 References: 

392 PROG-002: Resume support 

393 """ 

394 try: 

395 self._restore_callback() 

396 return True 

397 except Exception: 

398 return False 

399 

400 

401def confirm_cancellation( 

402 operation_name: str = "operation", 

403 *, 

404 destructive: bool = False, 

405) -> bool: 

406 """Confirm cancellation for destructive operations. 

407 

408 : Cancel confirmation for destructive operations. 

409 

410 Args: 

411 operation_name: Name of operation to cancel 

412 destructive: Whether operation is destructive 

413 

414 Returns: 

415 True if user confirms cancellation 

416 

417 Example: 

418 >>> if confirm_cancellation("Delete files", destructive=True): 

419 ... # Proceed with cancellation 

420 

421 References: 

422 PROG-002: Cancel confirmation for destructive operations 

423 """ 

424 if not destructive: 

425 return True 

426 

427 try: 

428 response = input(f"Cancel {operation_name}? This may lose data. [y/N]: ").strip().lower() 

429 return response in ("y", "yes") 

430 except (EOFError, KeyboardInterrupt): 

431 return True # Assume yes on Ctrl+C during prompt 

432 

433 

434__all__ = [ 

435 "CancellationManager", 

436 "CancelledException", 

437 "ResumableOperation", 

438 "confirm_cancellation", 

439]