Coverage for session_buddy / interruption_manager.py: 23.93%
352 statements
« prev ^ index » next coverage.py v7.13.1, created at 2026-01-04 00:43 -0800
« prev ^ index » next coverage.py v7.13.1, created at 2026-01-04 00:43 -0800
1"""Smart Interruption Management module for context switch detection and auto-save.
3This module provides intelligent interruption handling including:
4- Context switch detection (app/window changes)
5- Automatic session state preservation
6- Smart recovery from interruptions
7- Focus tracking and restoration
8"""
10import asyncio
11import gzip
12import json
13import logging
14import os
15import sqlite3
16import threading
17import time
18from collections.abc import Callable
19from dataclasses import asdict, dataclass
20from datetime import datetime, timedelta
21from enum import Enum
22from pathlib import Path
23from typing import TYPE_CHECKING, Any
25if TYPE_CHECKING:
26 import psutil
27 from watchdog.events import FileSystemEventHandler
28 from watchdog.observers import Observer
30try:
31 import psutil
33 PSUTIL_AVAILABLE = True
34except ImportError:
35 PSUTIL_AVAILABLE = False
37try:
38 from watchdog.events import FileSystemEventHandler
39 from watchdog.observers import Observer
41 WATCHDOG_AVAILABLE = True
42except ImportError:
43 WATCHDOG_AVAILABLE = False
44 Observer = object # type: ignore[assignment]
46# gzip is always available in Python stdlib
47COMPRESSION_AVAILABLE = True
49logger = logging.getLogger(__name__)
52class InterruptionType(Enum):
53 """Types of interruptions detected."""
55 APP_SWITCH = "app_switch"
56 WINDOW_CHANGE = "window_change"
57 SYSTEM_IDLE = "system_idle"
58 FOCUS_LOST = "focus_lost"
59 FILE_CHANGE = "file_change"
60 PROCESS_CHANGE = "process_change"
61 MANUAL_SAVE = "manual_save"
64class ContextState(Enum):
65 """Context preservation states."""
67 ACTIVE = "active"
68 INTERRUPTED = "interrupted"
69 PRESERVED = "preserved"
70 RESTORED = "restored"
71 LOST = "lost"
74@dataclass
75class InterruptionEvent:
76 """Interruption event with context information."""
78 id: str
79 event_type: InterruptionType
80 timestamp: datetime
81 source_context: dict[str, Any]
82 target_context: dict[str, Any]
83 duration: float | None
84 recovery_data: dict[str, Any] | None
85 auto_saved: bool
86 user_id: str
87 project_id: str | None
90@dataclass
91class SessionContext:
92 """Current session context information."""
94 session_id: str
95 user_id: str
96 project_id: str | None
97 active_app: str | None
98 active_window: str | None
99 working_directory: str
100 open_files: list[str]
101 cursor_positions: dict[str, Any]
102 environment_vars: dict[str, str]
103 process_state: dict[str, Any]
104 last_activity: datetime
105 focus_duration: float
106 interruption_count: int
107 recovery_attempts: int
110class FocusTracker:
111 """Tracks application and window focus changes."""
113 def __init__(self, callback: Callable[..., Any] | None = None) -> None:
114 """Initialize focus tracker."""
115 self.callback = callback
116 self.current_app: str | None = None
117 self.current_window: str | None = None
118 self.last_check = time.time()
119 self.focus_start = time.time()
120 self.running = False
121 self._monitor_thread: threading.Thread | None = None
123 def start_monitoring(self) -> None:
124 """Start focus monitoring."""
125 if self.running:
126 return
128 self.running = True
129 self._monitor_thread = threading.Thread(target=self._monitor_loop, daemon=True)
130 self._monitor_thread.start()
132 def stop_monitoring(self) -> None:
133 """Stop focus monitoring."""
134 self.running = False
135 if self._monitor_thread and self._monitor_thread.is_alive():
136 self._monitor_thread.join(timeout=2.0)
138 def _monitor_loop(self) -> None:
139 """Focus monitoring loop."""
140 while self.running:
141 try:
142 self._check_focus_change()
143 time.sleep(1.0) # Check every second
144 except Exception as e:
145 logger.exception(f"Focus monitoring error: {e}")
146 time.sleep(5.0) # Wait longer on error
148 def _check_focus_change(self) -> None:
149 """Check for focus changes using cross-platform methods."""
150 try:
151 current_app = self._get_active_application()
152 current_window = self._get_active_window()
154 now = time.time()
156 # Detect app switch
157 if current_app != self.current_app:
158 focus_duration = now - self.focus_start
160 if self.callback and self.current_app:
161 self.callback(
162 {
163 "type": InterruptionType.APP_SWITCH,
164 "source_app": self.current_app,
165 "target_app": current_app,
166 "focus_duration": focus_duration,
167 "timestamp": datetime.now(),
168 },
169 )
171 self.current_app = current_app
172 self.focus_start = now
174 # Detect window change within same app
175 elif current_window != self.current_window:
176 focus_duration = now - self.focus_start
178 if self.callback and self.current_window:
179 self.callback(
180 {
181 "type": InterruptionType.WINDOW_CHANGE,
182 "source_window": self.current_window,
183 "target_window": current_window,
184 "app": current_app,
185 "focus_duration": focus_duration,
186 "timestamp": datetime.now(),
187 },
188 )
190 self.current_window = current_window
191 self.focus_start = now
193 self.last_check = now
195 except Exception as e:
196 logger.debug(f"Focus check failed: {e}")
198 def _get_active_application(self) -> str | None:
199 """Get currently active application name."""
200 if not PSUTIL_AVAILABLE:
201 return None
203 try:
204 # Try to get the foreground process
205 # This is a simplified cross-platform approach
206 for proc in psutil.process_iter(["pid", "name"]):
207 try:
208 # Basic heuristic: look for common GUI applications
209 name = proc.info["name"]
210 if isinstance(name, str) and any(
211 gui_hint in name.lower()
212 for gui_hint in ("code", "browser", "terminal", "editor", "ide")
213 ):
214 return name
215 except (psutil.NoSuchProcess, psutil.AccessDenied):
216 continue
218 return None
220 except Exception:
221 return None
223 def _get_active_window(self) -> str | None:
224 """Get currently active window title."""
225 # This would require platform-specific implementations
226 # For now, return a placeholder
227 return f"Window_{int(time.time() % 1000)}"
230class FileChangeHandler(FileSystemEventHandler):
231 """Handles file system change events."""
233 def __init__(self, callback: Callable[..., Any] | None = None) -> None:
234 """Initialize file change handler."""
235 super().__init__()
236 self.callback = callback
237 self.last_events: dict[str, float] = {}
238 self.debounce_time = 1.0 # Seconds
240 def on_modified(self, event: Any) -> None:
241 """Handle file modification."""
242 if event.is_directory:
243 return
245 now = time.time()
246 file_path = event.src_path
248 # Debounce rapid changes
249 if file_path in self.last_events:
250 if now - self.last_events[file_path] < self.debounce_time:
251 return
253 self.last_events[file_path] = now
255 if self.callback:
256 self.callback(
257 {
258 "type": InterruptionType.FILE_CHANGE,
259 "file_path": file_path,
260 "event_type": "modified",
261 "timestamp": datetime.now(),
262 },
263 )
265 def on_created(self, event: Any) -> None:
266 """Handle file creation."""
267 if event.is_directory:
268 return
270 if self.callback:
271 self.callback(
272 {
273 "type": InterruptionType.FILE_CHANGE,
274 "file_path": event.src_path,
275 "event_type": "created",
276 "timestamp": datetime.now(),
277 },
278 )
280 def on_deleted(self, event: Any) -> None:
281 """Handle file deletion."""
282 if event.is_directory:
283 return
285 if self.callback:
286 self.callback(
287 {
288 "type": InterruptionType.FILE_CHANGE,
289 "file_path": event.src_path,
290 "event_type": "deleted",
291 "timestamp": datetime.now(),
292 },
293 )
296class InterruptionManager:
297 """Manages interruption detection and context preservation."""
299 def __init__(self, db_path: str | None = None) -> None:
300 """Initialize interruption manager."""
301 self.db_path = db_path or str(
302 Path.home() / ".claude" / "data" / "interruption_manager.db",
303 )
304 self._lock = threading.Lock()
305 self.current_context: SessionContext | None = None
306 self.focus_tracker = FocusTracker(callback=self._handle_interruption)
307 self.file_observer: Any = None
308 self.file_handler = FileChangeHandler(callback=self._handle_interruption)
309 self.auto_save_enabled = True
310 self.save_threshold = 30.0 # Auto-save after 30 seconds of focus
311 self.idle_threshold = 300.0 # 5 minutes idle detection
312 self._preservation_callbacks: list[Callable[..., Any]] = []
313 self._restoration_callbacks: list[Callable[..., Any]] = []
314 self._init_database()
316 def _init_database(self) -> None:
317 """Initialize SQLite database for interruption tracking."""
318 Path(self.db_path).parent.mkdir(parents=True, exist_ok=True)
320 with sqlite3.connect(self.db_path) as conn:
321 conn.execute("""
322 CREATE TABLE IF NOT EXISTS interruption_events (
323 id TEXT PRIMARY KEY,
324 event_type TEXT NOT NULL,
325 timestamp TIMESTAMP,
326 source_context TEXT, -- JSON
327 target_context TEXT, -- JSON
328 duration REAL,
329 recovery_data TEXT, -- JSON
330 auto_saved BOOLEAN,
331 user_id TEXT NOT NULL,
332 project_id TEXT
333 )
334 """)
336 conn.execute("""
337 CREATE TABLE IF NOT EXISTS session_contexts (
338 session_id TEXT PRIMARY KEY,
339 user_id TEXT NOT NULL,
340 project_id TEXT,
341 context_data TEXT, -- JSON
342 state TEXT NOT NULL,
343 created_at TIMESTAMP,
344 updated_at TIMESTAMP,
345 preserved_at TIMESTAMP,
346 restore_count INTEGER DEFAULT 0
347 )
348 """)
350 conn.execute("""
351 CREATE TABLE IF NOT EXISTS context_snapshots (
352 id TEXT PRIMARY KEY,
353 session_id TEXT NOT NULL,
354 snapshot_type TEXT NOT NULL,
355 timestamp TIMESTAMP,
356 data BLOB, -- Compressed context data
357 metadata TEXT -- JSON
358 )
359 """)
361 # Create indices
362 conn.execute(
363 "CREATE INDEX IF NOT EXISTS idx_interruptions_timestamp ON interruption_events(timestamp)",
364 )
365 conn.execute(
366 "CREATE INDEX IF NOT EXISTS idx_interruptions_user ON interruption_events(user_id)",
367 )
368 conn.execute(
369 "CREATE INDEX IF NOT EXISTS idx_contexts_user ON session_contexts(user_id)",
370 )
371 conn.execute(
372 "CREATE INDEX IF NOT EXISTS idx_contexts_state ON session_contexts(state)",
373 )
374 conn.execute(
375 "CREATE INDEX IF NOT EXISTS idx_snapshots_session ON context_snapshots(session_id)",
376 )
378 def start_monitoring(
379 self,
380 working_directory: str = ".",
381 watch_files: bool = True,
382 ) -> None:
383 """Start interruption monitoring."""
384 # Start focus tracking
385 self.focus_tracker.start_monitoring()
387 # Start file watching if requested
388 if watch_files and WATCHDOG_AVAILABLE:
389 try:
390 self.file_observer = Observer()
391 self.file_observer.schedule(
392 self.file_handler,
393 working_directory,
394 recursive=True,
395 )
396 self.file_observer.start()
397 except Exception as e:
398 logger.warning(f"Failed to start file monitoring: {e}")
400 def stop_monitoring(self) -> None:
401 """Stop interruption monitoring."""
402 # Stop focus tracking
403 self.focus_tracker.stop_monitoring()
405 # Stop file watching
406 if self.file_observer:
407 try:
408 if self.file_observer:
409 self.file_observer.stop()
410 self.file_observer.join(timeout=2.0)
411 except Exception as e:
412 logger.warning(f"Error stopping file observer: {e}")
413 finally:
414 self.file_observer = None
416 async def create_session_context(
417 self,
418 user_id: str,
419 project_id: str | None = None,
420 working_directory: str = ".",
421 ) -> str:
422 """Create new session context."""
423 session_id = f"ctx_{int(time.time() * 1000)}"
425 context = SessionContext(
426 session_id=session_id,
427 user_id=user_id,
428 project_id=project_id,
429 active_app=self.focus_tracker.current_app,
430 active_window=self.focus_tracker.current_window,
431 working_directory=working_directory,
432 open_files=[],
433 cursor_positions={},
434 environment_vars=os.environ.copy() if "os" in globals() else {},
435 process_state={},
436 last_activity=datetime.now(),
437 focus_duration=0.0,
438 interruption_count=0,
439 recovery_attempts=0,
440 )
442 self.current_context = context
444 # Store in database
445 with sqlite3.connect(self.db_path) as conn:
446 conn.execute(
447 """
448 INSERT INTO session_contexts (session_id, user_id, project_id, context_data, state, created_at, updated_at)
449 VALUES (?, ?, ?, ?, ?, ?, ?)
450 """,
451 (
452 session_id,
453 user_id,
454 project_id,
455 json.dumps(asdict(context)),
456 ContextState.ACTIVE.value,
457 datetime.now(),
458 datetime.now(),
459 ),
460 )
462 return session_id
464 async def preserve_context(
465 self,
466 session_id: str | None = None,
467 force: bool = False,
468 ) -> bool:
469 """Preserve current session context."""
470 context = self.current_context
471 if not context:
472 return False
474 session_id = session_id or context.session_id
476 try:
477 # Create context snapshot
478 snapshot_data = {
479 "context": asdict(context),
480 "timestamp": datetime.now().isoformat(),
481 "preservation_reason": "manual" if force else "auto",
482 "environment": self._capture_environment_state(),
483 }
485 # Compress the data using JSON (safer than pickle)
486 compressed_data = None
487 if COMPRESSION_AVAILABLE:
488 try:
489 serialized = json.dumps(snapshot_data).encode()
490 compressed_data = gzip.compress(serialized)
491 except Exception as e:
492 logger.warning(f"Compression failed: {e}")
493 compressed_data = json.dumps(snapshot_data).encode()
494 else:
495 compressed_data = json.dumps(snapshot_data).encode()
497 snapshot_id = f"snap_{int(time.time() * 1000)}"
499 # Store snapshot
500 with sqlite3.connect(self.db_path) as conn:
501 conn.execute(
502 """
503 INSERT INTO context_snapshots (id, session_id, snapshot_type, timestamp, data, metadata)
504 VALUES (?, ?, ?, ?, ?, ?)
505 """,
506 (
507 snapshot_id,
508 session_id,
509 "preservation",
510 datetime.now(),
511 compressed_data,
512 json.dumps(
513 {
514 "compressed": COMPRESSION_AVAILABLE,
515 "size": len(compressed_data),
516 },
517 ),
518 ),
519 )
521 # Update context state
522 conn.execute(
523 """
524 UPDATE session_contexts
525 SET state = ?, preserved_at = ?, updated_at = ?
526 WHERE session_id = ?
527 """,
528 (
529 ContextState.PRESERVED.value,
530 datetime.now(),
531 datetime.now(),
532 session_id,
533 ),
534 )
536 # Execute preservation callbacks
537 for callback in self._preservation_callbacks:
538 try:
539 await callback(context, snapshot_data)
540 except Exception as e:
541 logger.exception(f"Preservation callback error: {e}")
543 return True
545 except Exception as e:
546 logger.exception(f"Context preservation failed: {e}")
547 return False
549 async def restore_context(self, session_id: str) -> SessionContext | None:
550 """Restore session context from snapshot."""
551 try:
552 with sqlite3.connect(self.db_path) as conn:
553 conn.row_factory = sqlite3.Row
555 # Get latest snapshot
556 snapshot_row = conn.execute(
557 """
558 SELECT * FROM context_snapshots
559 WHERE session_id = ? AND snapshot_type = 'preservation'
560 ORDER BY timestamp DESC LIMIT 1
561 """,
562 (session_id,),
563 ).fetchone()
565 if not snapshot_row:
566 return None
568 # Decompress and restore data
569 compressed_data = snapshot_row["data"]
570 metadata = json.loads(snapshot_row["metadata"] or "{}")
572 if metadata.get("compressed", False) and COMPRESSION_AVAILABLE:
573 try:
574 decompressed = gzip.decompress(compressed_data)
575 snapshot_data = json.loads(decompressed.decode())
576 except Exception as e:
577 logger.warning(f"Decompression failed: {e}")
578 snapshot_data = json.loads(compressed_data.decode())
579 else:
580 snapshot_data = json.loads(compressed_data.decode())
582 # Restore context
583 context_dict = snapshot_data["context"]
584 context = SessionContext(**context_dict[str, Any])
585 context.recovery_attempts += 1
587 self.current_context = context
589 # Update database
590 conn.execute(
591 """
592 UPDATE session_contexts
593 SET state = ?, updated_at = ?, restore_count = restore_count + 1
594 WHERE session_id = ?
595 """,
596 (ContextState.RESTORED.value, datetime.now(), session_id),
597 )
599 # Execute restoration callbacks
600 for callback in self._restoration_callbacks:
601 try:
602 await callback(context, snapshot_data)
603 except Exception as e:
604 logger.exception(f"Restoration callback error: {e}")
606 return context
608 except Exception as e:
609 logger.exception(f"Context restoration failed: {e}")
610 return None
612 async def get_interruption_history(
613 self,
614 user_id: str,
615 hours: int = 24,
616 ) -> list[dict[str, Any]]:
617 """Get recent interruption history."""
618 since = datetime.now() - timedelta(hours=hours)
620 with sqlite3.connect(self.db_path) as conn:
621 conn.row_factory = sqlite3.Row
623 cursor = conn.execute(
624 """
625 SELECT * FROM interruption_events
626 WHERE user_id = ? AND timestamp >= ?
627 ORDER BY timestamp DESC
628 """,
629 (user_id, since),
630 )
632 results = []
633 for row in cursor.fetchall():
634 result = dict(row)
635 result["source_context"] = json.loads(result["source_context"] or "{}")
636 result["target_context"] = json.loads(result["target_context"] or "{}")
637 result["recovery_data"] = json.loads(result["recovery_data"] or "{}")
638 results.append(result)
640 return results
642 async def get_context_statistics(self, user_id: str) -> dict[str, Any]:
643 """Get context preservation statistics."""
644 with sqlite3.connect(self.db_path) as conn:
645 conn.row_factory = sqlite3.Row
647 # Get session stats
648 session_stats = conn.execute(
649 """
650 SELECT
651 COUNT(*) as total_sessions,
652 COUNT(CASE WHEN state = 'preserved' THEN 1 END) as preserved_sessions,
653 COUNT(CASE WHEN state = 'restored' THEN 1 END) as restored_sessions,
654 AVG(restore_count) as avg_restore_count
655 FROM session_contexts
656 WHERE user_id = ?
657 """,
658 (user_id,),
659 ).fetchone()
661 # Get interruption stats
662 interruption_stats = conn.execute(
663 """
664 SELECT
665 COUNT(*) as total_interruptions,
666 COUNT(CASE WHEN auto_saved THEN 1 END) as auto_saved_interruptions,
667 AVG(duration) as avg_duration,
668 event_type,
669 COUNT(*) as type_count
670 FROM interruption_events
671 WHERE user_id = ?
672 GROUP BY event_type
673 """,
674 (user_id,),
675 ).fetchall()
677 # Get snapshot stats
678 snapshot_stats = conn.execute(
679 """
680 SELECT
681 COUNT(*) as total_snapshots,
682 SUM(LENGTH(data)) as total_size,
683 AVG(LENGTH(data)) as avg_size
684 FROM context_snapshots cs
685 JOIN session_contexts sc ON cs.session_id = sc.session_id
686 WHERE sc.user_id = ?
687 """,
688 (user_id,),
689 ).fetchone()
691 return {
692 "sessions": dict(session_stats) if session_stats else {},
693 "interruptions": {
694 "total": dict(interruption_stats[0])["total_interruptions"]
695 if interruption_stats
696 else 0,
697 "by_type": [dict(row) for row in interruption_stats]
698 if interruption_stats
699 else [],
700 },
701 "snapshots": dict(snapshot_stats) if snapshot_stats else {},
702 }
704 def register_preservation_callback(self, callback: Callable[..., Any]) -> None:
705 """Register callback for context preservation."""
706 self._preservation_callbacks.append(callback)
708 def register_restoration_callback(self, callback: Callable[..., Any]) -> None:
709 """Register callback for context restoration."""
710 self._restoration_callbacks.append(callback)
712 def _handle_interruption(self, event_data: dict[str, Any]) -> None:
713 """Handle interruption event."""
714 try:
715 interruption_type = event_data["type"]
716 timestamp = event_data["timestamp"]
718 # Auto-save if enabled and threshold met
719 if (
720 self.auto_save_enabled
721 and self.current_context
722 and interruption_type
723 in {InterruptionType.APP_SWITCH, InterruptionType.FOCUS_LOST}
724 ):
725 focus_duration = event_data.get("focus_duration", 0)
726 if focus_duration >= self.save_threshold:
727 asyncio.create_task(self.preserve_context())
729 # Log the interruption
730 event_id = f"int_{int(time.time() * 1000)}"
732 interruption = InterruptionEvent(
733 id=event_id,
734 event_type=interruption_type,
735 timestamp=timestamp,
736 source_context=event_data.get("source_context", {}),
737 target_context=event_data.get("target_context", {}),
738 duration=event_data.get("focus_duration"),
739 recovery_data=None,
740 auto_saved=self.auto_save_enabled,
741 user_id=self.current_context.user_id
742 if self.current_context
743 else "unknown",
744 project_id=self.current_context.project_id
745 if self.current_context
746 else None,
747 )
749 # Store in database
750 asyncio.create_task(self._store_interruption(interruption))
752 # Update current context
753 if self.current_context:
754 self.current_context.interruption_count += 1
755 self.current_context.last_activity = timestamp
757 except Exception as e:
758 logger.exception(f"Interruption handling error: {e}")
760 async def _store_interruption(self, interruption: InterruptionEvent) -> None:
761 """Store interruption event in database."""
762 try:
763 with sqlite3.connect(self.db_path) as conn:
764 conn.execute(
765 """
766 INSERT INTO interruption_events
767 (id, event_type, timestamp, source_context, target_context, duration, recovery_data, auto_saved, user_id, project_id)
768 VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?)
769 """,
770 (
771 interruption.id,
772 interruption.event_type.value,
773 interruption.timestamp,
774 json.dumps(interruption.source_context),
775 json.dumps(interruption.target_context),
776 interruption.duration,
777 json.dumps(interruption.recovery_data or {}),
778 interruption.auto_saved,
779 interruption.user_id,
780 interruption.project_id,
781 ),
782 )
783 except Exception as e:
784 logger.exception(f"Failed to store interruption: {e}")
786 def _capture_environment_state(self) -> dict[str, Any]:
787 """Capture current environment state."""
788 state: dict[str, Any] = {
789 "timestamp": datetime.now().isoformat(),
790 "cwd": Path.cwd().as_posix(),
791 "processes": [],
792 }
794 # Capture running processes (limited for privacy)
795 if PSUTIL_AVAILABLE:
796 try:
797 for proc in psutil.process_iter(["pid", "name"]):
798 try:
799 name = proc.info["name"]
800 if any(
801 keyword in name.lower()
802 for keyword in ("code", "python", "node", "git")
803 ):
804 state["processes"].append(
805 {"pid": proc.info["pid"], "name": name},
806 )
807 except (psutil.NoSuchProcess, psutil.AccessDenied):
808 continue
809 except Exception as e:
810 logger.debug(f"Process capture failed: {e}")
812 return state
815# Global manager instance
816_interruption_manager = None
819def get_interruption_manager() -> InterruptionManager:
820 """Get global interruption manager instance."""
821 global _interruption_manager
822 if _interruption_manager is None:
823 _interruption_manager = InterruptionManager()
824 return _interruption_manager
827# Public API functions for MCP tools
828async def start_interruption_monitoring(
829 working_directory: str = ".",
830 watch_files: bool = True,
831) -> None:
832 """Start interruption monitoring."""
833 manager = get_interruption_manager()
834 manager.start_monitoring(working_directory, watch_files)
837def stop_interruption_monitoring() -> None:
838 """Stop interruption monitoring."""
839 manager = get_interruption_manager()
840 manager.stop_monitoring()
843async def create_session_context(
844 user_id: str,
845 project_id: str | None = None,
846 working_directory: str = ".",
847) -> str:
848 """Create new session context for interruption management."""
849 manager = get_interruption_manager()
850 return await manager.create_session_context(user_id, project_id, working_directory)
853async def preserve_current_context(
854 session_id: str | None = None,
855 force: bool = False,
856) -> bool:
857 """Preserve current session context."""
858 manager = get_interruption_manager()
859 return await manager.preserve_context(session_id, force)
862async def restore_session_context(session_id: str) -> dict[str, Any] | None:
863 """Restore session context from snapshot."""
864 manager = get_interruption_manager()
865 context = await manager.restore_context(session_id)
866 return asdict(context) if context else None
869async def get_interruption_history(
870 user_id: str,
871 hours: int = 24,
872) -> list[dict[str, Any]]:
873 """Get recent interruption history for user."""
874 manager = get_interruption_manager()
875 return await manager.get_interruption_history(user_id, hours)
878async def get_interruption_statistics(user_id: str) -> dict[str, Any]:
879 """Get context preservation and interruption statistics."""
880 manager = get_interruption_manager()
881 return await manager.get_context_statistics(user_id)