Coverage for session_mgmt_mcp/interruption_manager.py: 0.00%
353 statements
« prev ^ index » next coverage.py v7.10.6, created at 2025-09-01 05:22 -0700
« prev ^ index » next coverage.py v7.10.6, created at 2025-09-01 05:22 -0700
1"""Smart Interruption Management module for context switch detection and auto-save.
3This module provides intelligent interruption handling including:
4- Context switch detection (app/window changes)
5- Automatic session state preservation
6- Smart recovery from interruptions
7- Focus tracking and restoration
8"""
10import asyncio
11import json
12import logging
13import sqlite3
14import threading
15import time
16from collections.abc import Callable
17from dataclasses import asdict, dataclass
18from datetime import datetime, timedelta
19from enum import Enum
20from pathlib import Path
21from typing import Any
23try:
24 import psutil
26 PSUTIL_AVAILABLE = True
27except ImportError:
28 PSUTIL_AVAILABLE = False
30try:
31 from watchdog.events import FileSystemEventHandler
32 from watchdog.observers import Observer
34 WATCHDOG_AVAILABLE = True
35except ImportError:
36 WATCHDOG_AVAILABLE = False
38try:
39 import gzip
40 import pickle
42 COMPRESSION_AVAILABLE = True
43except ImportError:
44 COMPRESSION_AVAILABLE = False
46logger = logging.getLogger(__name__)
49class InterruptionType(Enum):
50 """Types of interruptions detected."""
52 APP_SWITCH = "app_switch"
53 WINDOW_CHANGE = "window_change"
54 SYSTEM_IDLE = "system_idle"
55 FOCUS_LOST = "focus_lost"
56 FILE_CHANGE = "file_change"
57 PROCESS_CHANGE = "process_change"
58 MANUAL_SAVE = "manual_save"
61class ContextState(Enum):
62 """Context preservation states."""
64 ACTIVE = "active"
65 INTERRUPTED = "interrupted"
66 PRESERVED = "preserved"
67 RESTORED = "restored"
68 LOST = "lost"
71@dataclass
72class InterruptionEvent:
73 """Interruption event with context information."""
75 id: str
76 event_type: InterruptionType
77 timestamp: datetime
78 source_context: dict[str, Any]
79 target_context: dict[str, Any]
80 duration: float | None
81 recovery_data: dict[str, Any] | None
82 auto_saved: bool
83 user_id: str
84 project_id: str | None
87@dataclass
88class SessionContext:
89 """Current session context information."""
91 session_id: str
92 user_id: str
93 project_id: str | None
94 active_app: str | None
95 active_window: str | None
96 working_directory: str
97 open_files: list[str]
98 cursor_positions: dict[str, Any]
99 environment_vars: dict[str, str]
100 process_state: dict[str, Any]
101 last_activity: datetime
102 focus_duration: float
103 interruption_count: int
104 recovery_attempts: int
107class FocusTracker:
108 """Tracks application and window focus changes."""
110 def __init__(self, callback: Callable | None = None) -> None:
111 """Initialize focus tracker."""
112 self.callback = callback
113 self.current_app = None
114 self.current_window = None
115 self.last_check = time.time()
116 self.focus_start = time.time()
117 self.running = False
118 self._monitor_thread = None
120 def start_monitoring(self) -> None:
121 """Start focus monitoring."""
122 if self.running:
123 return
125 self.running = True
126 self._monitor_thread = threading.Thread(target=self._monitor_loop, daemon=True)
127 self._monitor_thread.start()
129 def stop_monitoring(self) -> None:
130 """Stop focus monitoring."""
131 self.running = False
132 if self._monitor_thread and self._monitor_thread.is_alive():
133 self._monitor_thread.join(timeout=2.0)
135 def _monitor_loop(self) -> None:
136 """Focus monitoring loop."""
137 while self.running:
138 try:
139 self._check_focus_change()
140 time.sleep(1.0) # Check every second
141 except Exception as e:
142 logger.exception(f"Focus monitoring error: {e}")
143 time.sleep(5.0) # Wait longer on error
145 def _check_focus_change(self) -> None:
146 """Check for focus changes using cross-platform methods."""
147 try:
148 current_app = self._get_active_application()
149 current_window = self._get_active_window()
151 now = time.time()
153 # Detect app switch
154 if current_app != self.current_app:
155 focus_duration = now - self.focus_start
157 if self.callback and self.current_app:
158 self.callback(
159 {
160 "type": InterruptionType.APP_SWITCH,
161 "source_app": self.current_app,
162 "target_app": current_app,
163 "focus_duration": focus_duration,
164 "timestamp": datetime.now(),
165 },
166 )
168 self.current_app = current_app
169 self.focus_start = now
171 # Detect window change within same app
172 elif current_window != self.current_window:
173 focus_duration = now - self.focus_start
175 if self.callback and self.current_window:
176 self.callback(
177 {
178 "type": InterruptionType.WINDOW_CHANGE,
179 "source_window": self.current_window,
180 "target_window": current_window,
181 "app": current_app,
182 "focus_duration": focus_duration,
183 "timestamp": datetime.now(),
184 },
185 )
187 self.current_window = current_window
188 self.focus_start = now
190 self.last_check = now
192 except Exception as e:
193 logger.debug(f"Focus check failed: {e}")
195 def _get_active_application(self) -> str | None:
196 """Get currently active application name."""
197 if not PSUTIL_AVAILABLE:
198 return None
200 try:
201 # Try to get the foreground process
202 # This is a simplified cross-platform approach
203 for proc in psutil.process_iter(["pid", "name"]):
204 try:
205 # Basic heuristic: look for common GUI applications
206 name = proc.info["name"]
207 if any(
208 gui_hint in name.lower()
209 for gui_hint in ["code", "browser", "terminal", "editor", "ide"]
210 ):
211 return name
212 except (psutil.NoSuchProcess, psutil.AccessDenied):
213 continue
215 return "Unknown"
217 except Exception:
218 return None
220 def _get_active_window(self) -> str | None:
221 """Get currently active window title."""
222 # This would require platform-specific implementations
223 # For now, return a placeholder
224 return f"Window_{int(time.time() % 1000)}"
227class FileChangeHandler(FileSystemEventHandler):
228 """Handles file system change events."""
230 def __init__(self, callback: Callable | None = None) -> None:
231 """Initialize file change handler."""
232 super().__init__()
233 self.callback = callback
234 self.last_events = {}
235 self.debounce_time = 1.0 # Seconds
237 def on_modified(self, event) -> None:
238 """Handle file modification."""
239 if event.is_directory:
240 return
242 now = time.time()
243 file_path = event.src_path
245 # Debounce rapid changes
246 if file_path in self.last_events:
247 if now - self.last_events[file_path] < self.debounce_time:
248 return
250 self.last_events[file_path] = now
252 if self.callback:
253 self.callback(
254 {
255 "type": InterruptionType.FILE_CHANGE,
256 "file_path": file_path,
257 "event_type": "modified",
258 "timestamp": datetime.now(),
259 },
260 )
262 def on_created(self, event) -> None:
263 """Handle file creation."""
264 if event.is_directory:
265 return
267 if self.callback:
268 self.callback(
269 {
270 "type": InterruptionType.FILE_CHANGE,
271 "file_path": event.src_path,
272 "event_type": "created",
273 "timestamp": datetime.now(),
274 },
275 )
277 def on_deleted(self, event) -> None:
278 """Handle file deletion."""
279 if event.is_directory:
280 return
282 if self.callback:
283 self.callback(
284 {
285 "type": InterruptionType.FILE_CHANGE,
286 "file_path": event.src_path,
287 "event_type": "deleted",
288 "timestamp": datetime.now(),
289 },
290 )
293class InterruptionManager:
294 """Manages interruption detection and context preservation."""
296 def __init__(self, db_path: str | None = None) -> None:
297 """Initialize interruption manager."""
298 self.db_path = db_path or str(
299 Path.home() / ".claude" / "data" / "interruption_manager.db",
300 )
301 self._lock = threading.Lock()
302 self.current_context: SessionContext | None = None
303 self.focus_tracker = FocusTracker(callback=self._handle_interruption)
304 self.file_observer: Observer | None = None
305 self.file_handler = FileChangeHandler(callback=self._handle_interruption)
306 self.auto_save_enabled = True
307 self.save_threshold = 30.0 # Auto-save after 30 seconds of focus
308 self.idle_threshold = 300.0 # 5 minutes idle detection
309 self._preservation_callbacks: list[Callable] = []
310 self._restoration_callbacks: list[Callable] = []
311 self._init_database()
313 def _init_database(self) -> None:
314 """Initialize SQLite database for interruption tracking."""
315 Path(self.db_path).parent.mkdir(parents=True, exist_ok=True)
317 with sqlite3.connect(self.db_path) as conn:
318 conn.execute("""
319 CREATE TABLE IF NOT EXISTS interruption_events (
320 id TEXT PRIMARY KEY,
321 event_type TEXT NOT NULL,
322 timestamp TIMESTAMP,
323 source_context TEXT, -- JSON
324 target_context TEXT, -- JSON
325 duration REAL,
326 recovery_data TEXT, -- JSON
327 auto_saved BOOLEAN,
328 user_id TEXT NOT NULL,
329 project_id TEXT
330 )
331 """)
333 conn.execute("""
334 CREATE TABLE IF NOT EXISTS session_contexts (
335 session_id TEXT PRIMARY KEY,
336 user_id TEXT NOT NULL,
337 project_id TEXT,
338 context_data TEXT, -- JSON
339 state TEXT NOT NULL,
340 created_at TIMESTAMP,
341 updated_at TIMESTAMP,
342 preserved_at TIMESTAMP,
343 restore_count INTEGER DEFAULT 0
344 )
345 """)
347 conn.execute("""
348 CREATE TABLE IF NOT EXISTS context_snapshots (
349 id TEXT PRIMARY KEY,
350 session_id TEXT NOT NULL,
351 snapshot_type TEXT NOT NULL,
352 timestamp TIMESTAMP,
353 data BLOB, -- Compressed context data
354 metadata TEXT -- JSON
355 )
356 """)
358 # Create indices
359 conn.execute(
360 "CREATE INDEX IF NOT EXISTS idx_interruptions_timestamp ON interruption_events(timestamp)",
361 )
362 conn.execute(
363 "CREATE INDEX IF NOT EXISTS idx_interruptions_user ON interruption_events(user_id)",
364 )
365 conn.execute(
366 "CREATE INDEX IF NOT EXISTS idx_contexts_user ON session_contexts(user_id)",
367 )
368 conn.execute(
369 "CREATE INDEX IF NOT EXISTS idx_contexts_state ON session_contexts(state)",
370 )
371 conn.execute(
372 "CREATE INDEX IF NOT EXISTS idx_snapshots_session ON context_snapshots(session_id)",
373 )
375 def start_monitoring(
376 self, working_directory: str = ".", watch_files: bool = True
377 ) -> None:
378 """Start interruption monitoring."""
379 # Start focus tracking
380 self.focus_tracker.start_monitoring()
382 # Start file watching if requested
383 if watch_files and WATCHDOG_AVAILABLE:
384 try:
385 self.file_observer = Observer()
386 self.file_observer.schedule(
387 self.file_handler,
388 working_directory,
389 recursive=True,
390 )
391 self.file_observer.start()
392 except Exception as e:
393 logger.warning(f"Failed to start file monitoring: {e}")
395 def stop_monitoring(self) -> None:
396 """Stop interruption monitoring."""
397 # Stop focus tracking
398 self.focus_tracker.stop_monitoring()
400 # Stop file watching
401 if self.file_observer:
402 try:
403 self.file_observer.stop()
404 self.file_observer.join(timeout=2.0)
405 except Exception as e:
406 logger.warning(f"Error stopping file observer: {e}")
407 finally:
408 self.file_observer = None
410 async def create_session_context(
411 self,
412 user_id: str,
413 project_id: str | None = None,
414 working_directory: str = ".",
415 ) -> str:
416 """Create new session context."""
417 session_id = f"ctx_{int(time.time() * 1000)}"
419 context = SessionContext(
420 session_id=session_id,
421 user_id=user_id,
422 project_id=project_id,
423 active_app=self.focus_tracker.current_app,
424 active_window=self.focus_tracker.current_window,
425 working_directory=working_directory,
426 open_files=[],
427 cursor_positions={},
428 environment_vars=dict(os.environ) if "os" in globals() else {},
429 process_state={},
430 last_activity=datetime.now(),
431 focus_duration=0.0,
432 interruption_count=0,
433 recovery_attempts=0,
434 )
436 self.current_context = context
438 # Store in database
439 with sqlite3.connect(self.db_path) as conn:
440 conn.execute(
441 """
442 INSERT INTO session_contexts (session_id, user_id, project_id, context_data, state, created_at, updated_at)
443 VALUES (?, ?, ?, ?, ?, ?, ?)
444 """,
445 (
446 session_id,
447 user_id,
448 project_id,
449 json.dumps(asdict(context)),
450 ContextState.ACTIVE.value,
451 datetime.now(),
452 datetime.now(),
453 ),
454 )
456 return session_id
458 async def preserve_context(
459 self,
460 session_id: str | None = None,
461 force: bool = False,
462 ) -> bool:
463 """Preserve current session context."""
464 context = self.current_context
465 if not context:
466 return False
468 session_id = session_id or context.session_id
470 try:
471 # Create context snapshot
472 snapshot_data = {
473 "context": asdict(context),
474 "timestamp": datetime.now().isoformat(),
475 "preservation_reason": "manual" if force else "auto",
476 "environment": self._capture_environment_state(),
477 }
479 # Compress the data
480 compressed_data = None
481 if COMPRESSION_AVAILABLE:
482 try:
483 serialized = pickle.dumps(snapshot_data)
484 compressed_data = gzip.compress(serialized)
485 except Exception as e:
486 logger.warning(f"Compression failed: {e}")
487 compressed_data = json.dumps(snapshot_data).encode()
488 else:
489 compressed_data = json.dumps(snapshot_data).encode()
491 snapshot_id = f"snap_{int(time.time() * 1000)}"
493 # Store snapshot
494 with sqlite3.connect(self.db_path) as conn:
495 conn.execute(
496 """
497 INSERT INTO context_snapshots (id, session_id, snapshot_type, timestamp, data, metadata)
498 VALUES (?, ?, ?, ?, ?, ?)
499 """,
500 (
501 snapshot_id,
502 session_id,
503 "preservation",
504 datetime.now(),
505 compressed_data,
506 json.dumps(
507 {
508 "compressed": COMPRESSION_AVAILABLE,
509 "size": len(compressed_data),
510 },
511 ),
512 ),
513 )
515 # Update context state
516 conn.execute(
517 """
518 UPDATE session_contexts
519 SET state = ?, preserved_at = ?, updated_at = ?
520 WHERE session_id = ?
521 """,
522 (
523 ContextState.PRESERVED.value,
524 datetime.now(),
525 datetime.now(),
526 session_id,
527 ),
528 )
530 # Execute preservation callbacks
531 for callback in self._preservation_callbacks:
532 try:
533 await callback(context, snapshot_data)
534 except Exception as e:
535 logger.exception(f"Preservation callback error: {e}")
537 return True
539 except Exception as e:
540 logger.exception(f"Context preservation failed: {e}")
541 return False
543 async def restore_context(self, session_id: str) -> SessionContext | None:
544 """Restore session context from snapshot."""
545 try:
546 with sqlite3.connect(self.db_path) as conn:
547 conn.row_factory = sqlite3.Row
549 # Get latest snapshot
550 snapshot_row = conn.execute(
551 """
552 SELECT * FROM context_snapshots
553 WHERE session_id = ? AND snapshot_type = 'preservation'
554 ORDER BY timestamp DESC LIMIT 1
555 """,
556 (session_id,),
557 ).fetchone()
559 if not snapshot_row:
560 return None
562 # Decompress and restore data
563 compressed_data = snapshot_row["data"]
564 metadata = json.loads(snapshot_row["metadata"] or "{}")
566 if metadata.get("compressed", False) and COMPRESSION_AVAILABLE:
567 try:
568 decompressed = gzip.decompress(compressed_data)
569 snapshot_data = pickle.loads(decompressed)
570 except Exception as e:
571 logger.warning(f"Decompression failed: {e}")
572 snapshot_data = json.loads(compressed_data.decode())
573 else:
574 snapshot_data = json.loads(compressed_data.decode())
576 # Restore context
577 context_dict = snapshot_data["context"]
578 context = SessionContext(**context_dict)
579 context.recovery_attempts += 1
581 self.current_context = context
583 # Update database
584 conn.execute(
585 """
586 UPDATE session_contexts
587 SET state = ?, updated_at = ?, restore_count = restore_count + 1
588 WHERE session_id = ?
589 """,
590 (ContextState.RESTORED.value, datetime.now(), session_id),
591 )
593 # Execute restoration callbacks
594 for callback in self._restoration_callbacks:
595 try:
596 await callback(context, snapshot_data)
597 except Exception as e:
598 logger.exception(f"Restoration callback error: {e}")
600 return context
602 except Exception as e:
603 logger.exception(f"Context restoration failed: {e}")
604 return None
606 async def get_interruption_history(
607 self,
608 user_id: str,
609 hours: int = 24,
610 ) -> list[dict[str, Any]]:
611 """Get recent interruption history."""
612 since = datetime.now() - timedelta(hours=hours)
614 with sqlite3.connect(self.db_path) as conn:
615 conn.row_factory = sqlite3.Row
617 cursor = conn.execute(
618 """
619 SELECT * FROM interruption_events
620 WHERE user_id = ? AND timestamp >= ?
621 ORDER BY timestamp DESC
622 """,
623 (user_id, since),
624 )
626 results = []
627 for row in cursor.fetchall():
628 result = dict(row)
629 result["source_context"] = json.loads(result["source_context"] or "{}")
630 result["target_context"] = json.loads(result["target_context"] or "{}")
631 result["recovery_data"] = json.loads(result["recovery_data"] or "{}")
632 results.append(result)
634 return results
636 async def get_context_statistics(self, user_id: str) -> dict[str, Any]:
637 """Get context preservation statistics."""
638 with sqlite3.connect(self.db_path) as conn:
639 conn.row_factory = sqlite3.Row
641 # Get session stats
642 session_stats = conn.execute(
643 """
644 SELECT
645 COUNT(*) as total_sessions,
646 COUNT(CASE WHEN state = 'preserved' THEN 1 END) as preserved_sessions,
647 COUNT(CASE WHEN state = 'restored' THEN 1 END) as restored_sessions,
648 AVG(restore_count) as avg_restore_count
649 FROM session_contexts
650 WHERE user_id = ?
651 """,
652 (user_id,),
653 ).fetchone()
655 # Get interruption stats
656 interruption_stats = conn.execute(
657 """
658 SELECT
659 COUNT(*) as total_interruptions,
660 COUNT(CASE WHEN auto_saved THEN 1 END) as auto_saved_interruptions,
661 AVG(duration) as avg_duration,
662 event_type,
663 COUNT(*) as type_count
664 FROM interruption_events
665 WHERE user_id = ?
666 GROUP BY event_type
667 """,
668 (user_id,),
669 ).fetchall()
671 # Get snapshot stats
672 snapshot_stats = conn.execute(
673 """
674 SELECT
675 COUNT(*) as total_snapshots,
676 SUM(LENGTH(data)) as total_size,
677 AVG(LENGTH(data)) as avg_size
678 FROM context_snapshots cs
679 JOIN session_contexts sc ON cs.session_id = sc.session_id
680 WHERE sc.user_id = ?
681 """,
682 (user_id,),
683 ).fetchone()
685 return {
686 "sessions": dict(session_stats) if session_stats else {},
687 "interruptions": {
688 "total": dict(interruption_stats[0])["total_interruptions"]
689 if interruption_stats
690 else 0,
691 "by_type": [dict(row) for row in interruption_stats]
692 if interruption_stats
693 else [],
694 },
695 "snapshots": dict(snapshot_stats) if snapshot_stats else {},
696 }
698 def register_preservation_callback(self, callback: Callable) -> None:
699 """Register callback for context preservation."""
700 self._preservation_callbacks.append(callback)
702 def register_restoration_callback(self, callback: Callable) -> None:
703 """Register callback for context restoration."""
704 self._restoration_callbacks.append(callback)
706 def _handle_interruption(self, event_data: dict[str, Any]) -> None:
707 """Handle interruption event."""
708 try:
709 interruption_type = event_data["type"]
710 timestamp = event_data["timestamp"]
712 # Auto-save if enabled and threshold met
713 if (
714 self.auto_save_enabled
715 and self.current_context
716 and interruption_type
717 in [InterruptionType.APP_SWITCH, InterruptionType.FOCUS_LOST]
718 ):
719 focus_duration = event_data.get("focus_duration", 0)
720 if focus_duration >= self.save_threshold:
721 asyncio.create_task(self.preserve_context())
723 # Log the interruption
724 event_id = f"int_{int(time.time() * 1000)}"
726 interruption = InterruptionEvent(
727 id=event_id,
728 event_type=interruption_type,
729 timestamp=timestamp,
730 source_context=event_data.get("source_context", {}),
731 target_context=event_data.get("target_context", {}),
732 duration=event_data.get("focus_duration"),
733 recovery_data=None,
734 auto_saved=self.auto_save_enabled,
735 user_id=self.current_context.user_id
736 if self.current_context
737 else "unknown",
738 project_id=self.current_context.project_id
739 if self.current_context
740 else None,
741 )
743 # Store in database
744 asyncio.create_task(self._store_interruption(interruption))
746 # Update current context
747 if self.current_context:
748 self.current_context.interruption_count += 1
749 self.current_context.last_activity = timestamp
751 except Exception as e:
752 logger.exception(f"Interruption handling error: {e}")
754 async def _store_interruption(self, interruption: InterruptionEvent) -> None:
755 """Store interruption event in database."""
756 try:
757 with sqlite3.connect(self.db_path) as conn:
758 conn.execute(
759 """
760 INSERT INTO interruption_events
761 (id, event_type, timestamp, source_context, target_context, duration, recovery_data, auto_saved, user_id, project_id)
762 VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?)
763 """,
764 (
765 interruption.id,
766 interruption.event_type.value,
767 interruption.timestamp,
768 json.dumps(interruption.source_context),
769 json.dumps(interruption.target_context),
770 interruption.duration,
771 json.dumps(interruption.recovery_data or {}),
772 interruption.auto_saved,
773 interruption.user_id,
774 interruption.project_id,
775 ),
776 )
777 except Exception as e:
778 logger.exception(f"Failed to store interruption: {e}")
780 def _capture_environment_state(self) -> dict[str, Any]:
781 """Capture current environment state."""
782 state = {
783 "timestamp": datetime.now().isoformat(),
784 "cwd": Path.cwd().as_posix(),
785 "processes": [],
786 }
788 # Capture running processes (limited for privacy)
789 if PSUTIL_AVAILABLE:
790 try:
791 for proc in psutil.process_iter(["pid", "name"]):
792 try:
793 name = proc.info["name"]
794 if any(
795 keyword in name.lower()
796 for keyword in ["code", "python", "node", "git"]
797 ):
798 state["processes"].append(
799 {"pid": proc.info["pid"], "name": name},
800 )
801 except (psutil.NoSuchProcess, psutil.AccessDenied):
802 continue
803 except Exception as e:
804 logger.debug(f"Process capture failed: {e}")
806 return state
809# Global manager instance
810_interruption_manager = None
813def get_interruption_manager() -> InterruptionManager:
814 """Get global interruption manager instance."""
815 global _interruption_manager
816 if _interruption_manager is None:
817 _interruption_manager = InterruptionManager()
818 return _interruption_manager
821# Public API functions for MCP tools
822async def start_interruption_monitoring(
823 working_directory: str = ".",
824 watch_files: bool = True,
825) -> None:
826 """Start interruption monitoring."""
827 manager = get_interruption_manager()
828 manager.start_monitoring(working_directory, watch_files)
831def stop_interruption_monitoring() -> None:
832 """Stop interruption monitoring."""
833 manager = get_interruption_manager()
834 manager.stop_monitoring()
837async def create_session_context(
838 user_id: str,
839 project_id: str | None = None,
840 working_directory: str = ".",
841) -> str:
842 """Create new session context for interruption management."""
843 manager = get_interruption_manager()
844 return await manager.create_session_context(user_id, project_id, working_directory)
847async def preserve_current_context(
848 session_id: str | None = None,
849 force: bool = False,
850) -> bool:
851 """Preserve current session context."""
852 manager = get_interruption_manager()
853 return await manager.preserve_context(session_id, force)
856async def restore_session_context(session_id: str) -> dict[str, Any] | None:
857 """Restore session context from snapshot."""
858 manager = get_interruption_manager()
859 context = await manager.restore_context(session_id)
860 return asdict(context) if context else None
863async def get_interruption_history(
864 user_id: str,
865 hours: int = 24,
866) -> list[dict[str, Any]]:
867 """Get recent interruption history for user."""
868 manager = get_interruption_manager()
869 return await manager.get_interruption_history(user_id, hours)
872async def get_interruption_statistics(user_id: str) -> dict[str, Any]:
873 """Get context preservation and interruption statistics."""
874 manager = get_interruption_manager()
875 return await manager.get_context_statistics(user_id)