Coverage for session_mgmt_mcp/server.py: 14.11%
3686 statements
« prev ^ index » next coverage.py v7.10.6, created at 2025-09-01 05:22 -0700
« prev ^ index » next coverage.py v7.10.6, created at 2025-09-01 05:22 -0700
1#!/usr/bin/env python3
2"""Claude Session Management MCP Server - FastMCP Version.
4A dedicated MCP server that provides session management functionality
5including initialization, checkpoints, and cleanup across all projects.
7This server can be included in any project's .mcp.json file to provide
8automatic access to /session-init, /session-checkpoint,
9and /session-end slash commands.
10"""
12import asyncio
13import hashlib
14import json
15import logging
16import os
17import shutil
18import subprocess
19import sys
20from datetime import datetime
21from pathlib import Path
22from typing import Any
25# Configure structured logging
26class SessionLogger:
27 """Structured logging for session management with context."""
29 def __init__(self, log_dir: Path) -> None:
30 self.log_dir = log_dir
31 self.log_dir.mkdir(parents=True, exist_ok=True)
32 self.log_file = (
33 log_dir / f"session_management_{datetime.now().strftime('%Y%m%d')}.log"
34 )
36 # Configure logger
37 self.logger = logging.getLogger("session_management")
38 self.logger.setLevel(logging.INFO)
40 # Avoid duplicate handlers
41 if not self.logger.handlers: 41 ↛ exitline 41 didn't return from function '__init__' because the condition on line 41 was always true
42 # File handler with structured format
43 file_handler = logging.FileHandler(self.log_file)
44 file_handler.setLevel(logging.INFO)
46 # Console handler for errors
47 console_handler = logging.StreamHandler(sys.stderr)
48 console_handler.setLevel(logging.ERROR)
50 # Structured formatter
51 formatter = logging.Formatter(
52 "%(asctime)s | %(levelname)s | %(funcName)s:%(lineno)d | %(message)s",
53 )
54 file_handler.setFormatter(formatter)
55 console_handler.setFormatter(formatter)
57 self.logger.addHandler(file_handler)
58 self.logger.addHandler(console_handler)
60 def info(self, message: str, **context) -> None:
61 """Log info with optional context."""
62 if context:
63 message = f"{message} | Context: {json.dumps(context)}"
64 self.logger.info(message)
66 def warning(self, message: str, **context) -> None:
67 """Log warning with optional context."""
68 if context:
69 message = f"{message} | Context: {json.dumps(context)}"
70 self.logger.warning(message)
72 def error(self, message: str, **context) -> None:
73 """Log error with optional context."""
74 if context:
75 message = f"{message} | Context: {json.dumps(context)}"
76 self.logger.error(message)
79# Initialize logger
80claude_dir = Path.home() / ".claude"
81session_logger = SessionLogger(claude_dir / "logs")
83try:
84 from fastmcp import FastMCP
86 MCP_AVAILABLE = True
87except ImportError:
88 # Check if we're in a test environment
89 if "pytest" in sys.modules or "test" in sys.argv[0].lower():
90 print(
91 "Warning: FastMCP not available in test environment, using mock",
92 file=sys.stderr,
93 )
95 # Create a minimal mock FastMCP for testing
96 class MockFastMCP:
97 def __init__(self, name) -> None:
98 self.name = name
99 self.tools = {}
100 self.prompts = {}
102 def tool(self, *args, **kwargs):
103 def decorator(func):
104 return func
106 return decorator
108 def prompt(self, *args, **kwargs):
109 def decorator(func):
110 return func
112 return decorator
114 def run(self, *args, **kwargs) -> None:
115 pass
117 FastMCP = MockFastMCP
118 MCP_AVAILABLE = False
119 else:
120 print("FastMCP not available. Install with: uv add fastmcp", file=sys.stderr)
121 sys.exit(1)
123# Session management availability (no longer using global workspace)
124SESSION_MANAGEMENT_AVAILABLE = False
126# Import reflection tools
127try:
128 from session_mgmt_mcp.reflection_tools import (
129 get_current_project,
130 get_reflection_database,
131 )
133 REFLECTION_TOOLS_AVAILABLE = True
134except ImportError as e:
135 print(f"Reflection tools import failed: {e}", file=sys.stderr)
136 REFLECTION_TOOLS_AVAILABLE = False
138# Import enhanced search tools
139try:
140 from session_mgmt_mcp.search_enhanced import EnhancedSearchEngine
142 ENHANCED_SEARCH_AVAILABLE = True
143except ImportError as e:
144 print(f"Enhanced search import failed: {e}", file=sys.stderr)
145 ENHANCED_SEARCH_AVAILABLE = False
147# Import multi-project coordination tools
148try:
149 from session_mgmt_mcp.multi_project_coordinator import MultiProjectCoordinator
151 MULTI_PROJECT_AVAILABLE = True
152except ImportError as e:
153 print(f"Multi-project coordinator import failed: {e}", file=sys.stderr)
154 MULTI_PROJECT_AVAILABLE = False
156# Import advanced search engine
157try:
158 from session_mgmt_mcp.advanced_search import AdvancedSearchEngine
160 ADVANCED_SEARCH_AVAILABLE = True
161except ImportError as e:
162 print(f"Advanced search engine import failed: {e}", file=sys.stderr)
163 ADVANCED_SEARCH_AVAILABLE = False
165# Import configuration management
166try:
167 from session_mgmt_mcp.config import get_config
169 CONFIG_AVAILABLE = True
170except ImportError as e:
171 print(f"Configuration management import failed: {e}", file=sys.stderr)
172 CONFIG_AVAILABLE = False
174# Import auto-context loading tools
175try:
176 from session_mgmt_mcp.context_manager import AutoContextLoader
178 AUTO_CONTEXT_AVAILABLE = True
179except ImportError as e:
180 print(f"Auto-context loading import failed: {e}", file=sys.stderr)
181 AUTO_CONTEXT_AVAILABLE = False
183# Import memory optimization tools
184try:
185 from session_mgmt_mcp.memory_optimizer import MemoryOptimizer
187 MEMORY_OPTIMIZER_AVAILABLE = True
188except ImportError as e:
189 print(f"Memory optimizer import failed: {e}", file=sys.stderr)
190 MEMORY_OPTIMIZER_AVAILABLE = False
192# Import application monitoring tools
193try:
194 from session_mgmt_mcp.app_monitor import ApplicationMonitor
196 APP_MONITOR_AVAILABLE = True
197except ImportError as e:
198 print(f"Application monitoring import failed: {e}", file=sys.stderr)
199 APP_MONITOR_AVAILABLE = False
201# Import LLM providers
202try:
203 from session_mgmt_mcp.llm_providers import LLMManager, LLMMessage
205 LLM_PROVIDERS_AVAILABLE = True
206except ImportError as e:
207 print(f"LLM providers import failed: {e}", file=sys.stderr)
208 LLM_PROVIDERS_AVAILABLE = False
210# Import serverless mode
211try:
212 from session_mgmt_mcp.serverless_mode import (
213 ServerlessConfigManager,
214 ServerlessSessionManager,
215 )
217 SERVERLESS_MODE_AVAILABLE = True
218except ImportError as e:
219 print(f"Serverless mode import failed: {e}", file=sys.stderr)
220 SERVERLESS_MODE_AVAILABLE = False
222# Import Crackerjack integration tools
223try:
224 from session_mgmt_mcp.crackerjack_integration import CrackerjackIntegration
226 CRACKERJACK_INTEGRATION_AVAILABLE = True
227except ImportError as e:
228 print(f"Crackerjack integration import failed: {e}", file=sys.stderr)
229 CRACKERJACK_INTEGRATION_AVAILABLE = False
232class SessionPermissionsManager:
233 """Manages session permissions to avoid repeated prompts for trusted operations."""
235 _instance = None
236 _session_id = None
238 def __new__(cls, claude_dir: Path):
239 """Singleton pattern to ensure consistent session ID across tool calls."""
240 if cls._instance is None: 240 ↛ 243line 240 didn't jump to line 243 because the condition on line 240 was always true
241 cls._instance = super().__new__(cls)
242 cls._instance._initialized = False
243 return cls._instance
245 def __init__(self, claude_dir: Path) -> None:
246 if self._initialized: 246 ↛ 247line 246 didn't jump to line 247 because the condition on line 246 was never true
247 return
248 self.claude_dir = claude_dir
249 self.permissions_file = claude_dir / "sessions" / "trusted_permissions.json"
250 self.permissions_file.parent.mkdir(exist_ok=True)
251 self.trusted_operations: set[str] = set()
252 # Use class-level session ID to persist across instances
253 if SessionPermissionsManager._session_id is None: 253 ↛ 255line 253 didn't jump to line 255 because the condition on line 253 was always true
254 SessionPermissionsManager._session_id = self._generate_session_id()
255 self.session_id = SessionPermissionsManager._session_id
256 self._load_permissions()
257 self._initialized = True
259 def _generate_session_id(self) -> str:
260 """Generate unique session ID based on current time and working directory."""
261 session_data = f"{datetime.now().isoformat()}_{os.getcwd()}"
262 return hashlib.md5(session_data.encode()).hexdigest()[:12]
264 def _load_permissions(self) -> None:
265 """Load previously granted permissions."""
266 if self.permissions_file.exists(): 266 ↛ exitline 266 didn't return from function '_load_permissions' because the condition on line 266 was always true
267 try:
268 with open(self.permissions_file) as f:
269 data = json.load(f)
270 self.trusted_operations.update(data.get("trusted_operations", []))
271 except (json.JSONDecodeError, KeyError):
272 pass
274 def _save_permissions(self) -> None:
275 """Save current trusted permissions."""
276 data = {
277 "trusted_operations": list(self.trusted_operations),
278 "last_updated": datetime.now().isoformat(),
279 "session_id": self.session_id,
280 }
281 with open(self.permissions_file, "w") as f:
282 json.dump(data, f, indent=2)
284 def is_operation_trusted(self, operation: str) -> bool:
285 """Check if an operation is already trusted."""
286 return operation in self.trusted_operations
288 def trust_operation(self, operation: str, description: str = "") -> None:
289 """Mark an operation as trusted to avoid future prompts."""
290 self.trusted_operations.add(operation)
291 self._save_permissions()
293 def get_permission_status(self) -> dict[str, Any]:
294 """Get current permission status."""
295 return {
296 "session_id": self.session_id,
297 "trusted_operations_count": len(self.trusted_operations),
298 "trusted_operations": list(self.trusted_operations),
299 "permissions_file": str(self.permissions_file),
300 }
302 def revoke_all_permissions(self) -> None:
303 """Revoke all trusted permissions (for security reset)."""
304 self.trusted_operations.clear()
305 if self.permissions_file.exists():
306 self.permissions_file.unlink()
308 # Common trusted operations
309 TRUSTED_UV_OPERATIONS = "uv_package_management"
310 TRUSTED_GIT_OPERATIONS = "git_repository_access"
311 TRUSTED_FILE_OPERATIONS = "project_file_access"
312 TRUSTED_SUBPROCESS_OPERATIONS = "subprocess_execution"
313 TRUSTED_NETWORK_OPERATIONS = "network_access"
314 # TRUSTED_WORKSPACE_OPERATIONS removed - no longer needed
317# Utility Functions
318def _detect_other_mcp_servers() -> dict[str, bool]:
319 """Detect availability of other MCP servers by checking common paths and processes."""
320 detected = {}
322 # Check for crackerjack MCP server
323 try:
324 # Try to import crackerjack to see if it's available
325 result = subprocess.run(
326 ["crackerjack", "--version"],
327 check=False,
328 capture_output=True,
329 text=True,
330 timeout=5,
331 )
332 detected["crackerjack"] = result.returncode == 0
333 except (subprocess.SubprocessError, FileNotFoundError, subprocess.TimeoutExpired):
334 detected["crackerjack"] = False
336 return detected
339def _generate_server_guidance(detected_servers: dict[str, bool]) -> list[str]:
340 """Generate guidance messages based on detected servers."""
341 guidance = []
343 if detected_servers.get("crackerjack", False):
344 guidance.extend(
345 [
346 "💡 CRACKERJACK INTEGRATION DETECTED:",
347 " Enhanced commands available for better development experience:",
348 " • Use /session-mgmt:crackerjack-run instead of /crackerjack:run",
349 " • Gets memory, analytics, and intelligent insights automatically",
350 " • View trends with /session-mgmt:crackerjack-history",
351 " • Analyze patterns with /session-mgmt:crackerjack-patterns",
352 ],
353 )
355 return guidance
358# Initialize FastMCP 2.0 server
359mcp = FastMCP("session-mgmt-mcp")
361# Register slash commands as MCP prompts (not resources!)
362SESSION_COMMANDS = {
363 "init": """# Session Initialization
365Initialize Claude session with comprehensive setup including UV dependencies and automation tools.
367This command will:
368- Set up Claude directory structure
369- Check for required toolkits and automation tools
370- Initialize project context and dependencies
371- Set up performance monitoring and session tracking
372- Provide comprehensive setup summary with next steps
374Run this at the start of any coding session for optimal Claude integration.
375""",
376 "checkpoint": """# Session Checkpoint
378Perform mid-session quality checkpoint with workflow analysis, optimization recommendations, and strategic compaction.
380This command will:
381- Analyze current session progress and workflow effectiveness
382- Check for performance bottlenecks and optimization opportunities
383- Validate current task completion status
384- Provide recommendations for workflow improvements
385- Create checkpoint for session recovery if needed
386- **Analyze context usage and recommend /compact when beneficial**
387- Perform strategic compaction and cleanup (replaces disabled auto-compact):
388 • DuckDB reflection database optimization (VACUUM/ANALYZE)
389 • Session log cleanup (retain last 10 files)
390 • Temporary file cleanup (cache files, .DS_Store, old coverage files)
391 • Git repository optimization (gc --auto, prune remote branches)
392 • UV package cache cleanup
394**RECOMMENDED WORKFLOW:**
3951. Run /session-mgmt:checkpoint for comprehensive analysis
3962. If checkpoint recommends context compaction, run: `/compact`
3973. Continue with optimized session context
399Use this periodically during long coding sessions to maintain optimal productivity and system performance.
400""",
401 "end": """# Session End
403End Claude session with cleanup, learning capture, and handoff file creation.
405This command will:
406- Clean up temporary files and session artifacts
407- Capture learning outcomes and session insights
408- Create handoff documentation for future sessions
409- Save session analytics and performance metrics
410- Provide session summary with accomplishments
412Run this at the end of any coding session for proper cleanup and knowledge retention.
413""",
414 "status": """# Session Status
416Get current session status and project context information with health checks.
418This command will:
419- Display current project context and workspace information
420- Show comprehensive health checks for all system components
421- Report session permissions and trusted operations
422- Analyze project maturity and structure
423- List all available MCP tools and their status
425Use this to quickly understand your current session state and system health.
426""",
427 "permissions": """# Session Permissions
429Manage session permissions for trusted operations to avoid repeated prompts.
431This command will:
432- Show current trusted operations and permission status
433- Allow you to manually trust specific operations
434- Provide security reset option to revoke all permissions
435- Display available operation types that can be trusted
436- Help streamline workflow by reducing permission prompts
438Use this to manage which operations Claude can perform without prompting.
439""",
440 "reflect": """# Reflect on Past
442Search past conversations and store reflections with semantic similarity.
444This command will:
445- Search through your conversation history using AI embeddings
446- Find relevant past discussions and solutions
447- Store new insights and reflections for future reference
448- Provide semantic similarity scoring for relevance
449- Help you learn from previous sessions and avoid repeating work
451Use this to leverage your conversation history and build on previous insights.
452""",
453 "quick-search": """# Quick Search
455Quick search that returns only the count and top result for fast overview.
457This command will:
458- Perform rapid semantic search through conversation history
459- Return summary statistics and the most relevant result
460- Show total matches with different relevance thresholds
461- Provide fast insight into whether you've addressed a topic before
462- Optimize for speed when you need quick context checks
464Use this for rapid checks before diving into detailed work.
465""",
466 "search-summary": """# Search Summary
468Get aggregated insights from search results without individual result details.
470This command will:
471- Analyze patterns across multiple conversation matches
472- Extract common themes and recurring topics
473- Provide statistical overview of search results
474- Identify projects and contexts where topics were discussed
475- Generate high-level insights without overwhelming detail
477Use this to understand broad patterns and themes in your work history.
478""",
479 "reflection-stats": """# Reflection Stats
481Get statistics about the reflection database and conversation memory.
483This command will:
484- Show total stored conversations and reflections
485- Display database health and embedding provider status
486- Report memory system capacity and usage statistics
487- Verify semantic search system is functioning properly
488- Provide technical details about the knowledge storage system
490Use this to monitor your conversation memory system and ensure it's working optimally.
491""",
492 "search-code": """# Code Pattern Search
494Search for code patterns in your conversation history using AST (Abstract Syntax Tree) parsing.
496This command will:
497- Parse Python code blocks from conversations
498- Extract functions, classes, imports, loops, and other patterns
499- Rank results by relevance to your query
500- Show code context and project information
502Examples:
503- Search for functions: pattern_type='function'
504- Search for class definitions: pattern_type='class'
505- Search for error handling: query='try except'
507Use this to find code examples and patterns from your development sessions.
508""",
509 "search-errors": """# Error Pattern Search
511Search for error messages, exceptions, and debugging contexts in your conversation history.
513This command will:
514- Find Python tracebacks and exceptions
515- Detect JavaScript errors and warnings
516- Identify debugging and testing contexts
517- Show error context and solutions
519Examples:
520- Find Python errors: error_type='python_exception'
521- Find import issues: query='ImportError'
522- Find debugging sessions: query='debug'
524Use this to quickly find solutions to similar errors you've encountered before.
525""",
526 "search-temporal": """# Temporal Search
528Search your conversation history using natural language time expressions.
530This command will:
531- Parse time expressions like "yesterday", "last week", "2 days ago"
532- Find conversations within that time range
533- Optionally filter by additional search terms
534- Sort results by time and relevance
536Examples:
537- "yesterday" - conversations from yesterday
538- "last week" - conversations from the past week
539- "2 days ago" - conversations from 2 days ago
540- "this month" + query - filter by content within the month
542Use this to find recent discussions or work from specific time periods.
543""",
544 "auto-load-context": """# Auto-Context Loading
546Automatically detect current development context and load relevant conversations.
548This command will:
549- Analyze your current project structure and files
550- Detect programming languages and tools in use
551- Identify project type (web app, CLI tool, library, etc.)
552- Find recent file modifications
553- Load conversations relevant to your current context
554- Score conversations by relevance to current work
556Examples:
557- Load default context: auto_load_context()
558- Increase results: max_conversations=20
559- Lower threshold: min_relevance=0.2
561Use this at the start of coding sessions to get relevant context automatically.
562""",
563 "context-summary": """# Context Summary
565Get a quick summary of your current development context without loading conversations.
567This command will:
568- Detect current project name and type
569- Identify programming languages and tools
570- Show Git repository information
571- Display recently modified files
572- Calculate detection confidence score
574Use this to understand what context the system has detected about your current work.
575""",
576 "compress-memory": """# Memory Compression
578Compress conversation memory by consolidating old conversations into summaries.
580This command will:
581- Analyze conversation age and importance
582- Group related conversations into clusters
583- Create consolidated summaries of old conversations
584- Remove redundant conversation data
585- Calculate space savings and compression ratios
587Examples:
588- Default compression: compress_memory()
589- Preview changes: dry_run=True
590- Aggressive compression: max_age_days=14, importance_threshold=0.5
592Use this periodically to keep your conversation memory manageable and efficient.
593""",
594 "compression-stats": """# Compression Statistics
596Get detailed statistics about memory compression history and current database status.
598This command will:
599- Show last compression run details
600- Display space savings and compression ratios
601- Report current database size and conversation count
602- Show number of consolidated conversations
603- Provide compression efficiency metrics
605Use this to monitor memory usage and compression effectiveness.
606""",
607 "retention-policy": """# Retention Policy
609Configure memory retention policy parameters for automatic compression.
611This command will:
612- Set maximum conversation age and count limits
613- Configure importance threshold for retention
614- Define consolidation age triggers
615- Adjust compression ratio targets
617Examples:
618- Conservative: max_age_days=365, importance_threshold=0.2
619- Aggressive: max_age_days=90, importance_threshold=0.5
620- Custom: consolidation_age_days=14
622Use this to customize how your conversation memory is managed over time.
623""",
624 "start-app-monitoring": """# Start Application Monitoring
626Monitor your development activity to provide better context and insights.
628This command will:
629- Start file system monitoring for code changes
630- Track application focus (IDE, browser, terminal)
631- Monitor documentation site visits
632- Build activity profiles for better context
634Monitoring includes:
635- File modifications in your project directories
636- IDE and editor activity patterns
637- Browser navigation to documentation sites
638- Application focus and context switching
640Use this to automatically capture your development context for better session insights.
641""",
642 "stop-app-monitoring": """# Stop Application Monitoring
644Stop monitoring your development activity.
646This command will:
647- Stop file system monitoring
648- Stop application focus tracking
649- Preserve collected activity data
650- Clean up monitoring resources
652Use this when you want to pause monitoring or when you're done with a development session.
653""",
654 "activity-summary": """# Activity Summary
656Get a comprehensive summary of your recent development activity.
658This command will:
659- Show file modification patterns
660- List most active applications
661- Display visited documentation sites
662- Calculate productivity metrics
664Summary includes:
665- Event counts by type and application
666- Most actively edited files
667- Documentation resources consulted
668- Average relevance scores
670Use this to understand your development patterns and identify productive sessions.
671""",
672 "context-insights": """# Context Insights
674Analyze recent development activity for contextual insights.
676This command will:
677- Identify primary focus areas
678- Detect technologies being used
679- Count context switches
680- Calculate productivity scores
682Insights include:
683- Primary application focus
684- Active programming languages
685- Documentation topics explored
686- Project switching patterns
687- Overall productivity assessment
689Use this to understand your current development context and optimize your workflow.
690""",
691 "active-files": """# Active Files
693Show files that are currently being actively worked on.
695This command will:
696- List recently modified files
697- Show activity scores and patterns
698- Highlight most frequently changed files
699- Include project context
701File activity is scored based on:
702- Frequency of modifications
703- Recency of changes
704- File type and relevance
705- Project context
707Use this to quickly see what you're currently working on and resume interrupted tasks.
708""",
709 "crackerjack-run": """# Execute Crackerjack Command
711Execute a Crackerjack command and parse the output for insights.
713This command will:
714- Execute any Crackerjack command (test, lint, format, etc.)
715- Parse test results, lint issues, coverage data, and security findings
716- Extract memory insights from command output
717- Track execution history and performance metrics
718- Provide structured feedback on code quality
720Examples:
721- /crackerjack-run test -- Run all tests
722- /crackerjack-run lint -- Check code quality
723- /crackerjack-run coverage -- Generate coverage report
724- /crackerjack-run security -- Run security scan
726Use this to integrate code quality checks directly into your development workflow.
727""",
728 "crackerjack-history": """# Crackerjack Results History
730Get recent Crackerjack command execution history with parsed results.
732This command will:
733- Show recent Crackerjack command executions
734- Display test results, lint issues, and coverage trends
735- Filter by command type or time period
736- Highlight performance changes and quality metrics
737- Track project health over time
739Use this to monitor project quality trends and identify recurring issues.
740""",
741 "crackerjack-metrics": """# Quality Metrics Dashboard
743Get quality metrics trends from Crackerjack execution history.
745This command will:
746- Display test pass rates and coverage trends
747- Show lint issue patterns and code quality scores
748- Track security findings and vulnerability trends
749- Provide quality trend analysis with recommendations
750- Generate comprehensive project health reports
752Use this to get insights into your project's overall quality and identify areas for improvement.
753""",
754 "crackerjack-patterns": """# Test Failure Pattern Analysis
756Analyze test failure patterns and trends for debugging insights.
758This command will:
759- Identify frequently failing tests and common error patterns
760- Analyze test stability scores and duration trends
761- Highlight problematic files and modules
762- Provide debugging recommendations based on failure patterns
763- Track test health and reliability metrics
765Use this to proactively identify and fix recurring test issues before they become problems.
766""",
767}
770# Register init prompt
771@mcp.prompt("init")
772async def get_session_init_prompt() -> str:
773 """Initialize Claude session with comprehensive setup including UV dependencies, global workspace verification, and automation tools."""
774 return SESSION_COMMANDS["init"]
777# Register checkpoint prompt
778@mcp.prompt("checkpoint")
779async def get_session_checkpoint_prompt() -> str:
780 """Perform mid-session quality checkpoint with workflow analysis and optimization recommendations."""
781 return SESSION_COMMANDS["checkpoint"]
784# Register end prompt
785@mcp.prompt("end")
786async def get_session_end_prompt() -> str:
787 """End Claude session with cleanup, learning capture, and handoff file creation."""
788 return SESSION_COMMANDS["end"]
791# Register status prompt
792@mcp.prompt("status")
793async def get_session_status_prompt() -> str:
794 """Get current session status and project context information with health checks."""
795 return SESSION_COMMANDS["status"]
798# Register permissions prompt
799@mcp.prompt("permissions")
800async def get_session_permissions_prompt() -> str:
801 """Manage session permissions for trusted operations to avoid repeated prompts."""
802 return SESSION_COMMANDS["permissions"]
805# Register reflect prompt
806@mcp.prompt("reflect")
807async def get_session_reflect_prompt() -> str:
808 """Search past conversations and store reflections with semantic similarity."""
809 return SESSION_COMMANDS["reflect"]
812# Register quick-search prompt
813@mcp.prompt("quick-search")
814async def get_session_quick_search_prompt() -> str:
815 """Quick search that returns only the count and top result for fast overview."""
816 return SESSION_COMMANDS["quick-search"]
819# Register search-summary prompt
820@mcp.prompt("search-summary")
821async def get_session_search_summary_prompt() -> str:
822 """Get aggregated insights from search results without individual result details."""
823 return SESSION_COMMANDS["search-summary"]
826# Register reflection-stats prompt
827@mcp.prompt("reflection-stats")
828async def get_session_reflection_stats_prompt() -> str:
829 """Get statistics about the reflection database and conversation memory."""
830 return SESSION_COMMANDS["reflection-stats"]
833# Register Crackerjack prompts
834@mcp.prompt("crackerjack-run")
835async def get_crackerjack_run_prompt() -> str:
836 """Execute a Crackerjack command and parse the output for insights."""
837 return SESSION_COMMANDS["crackerjack-run"]
840@mcp.prompt("crackerjack-history")
841async def get_crackerjack_history_prompt() -> str:
842 """Get recent Crackerjack command execution history with parsed results."""
843 return SESSION_COMMANDS["crackerjack-history"]
846@mcp.prompt("crackerjack-metrics")
847async def get_crackerjack_metrics_prompt() -> str:
848 """Get quality metrics trends from Crackerjack execution history."""
849 return SESSION_COMMANDS["crackerjack-metrics"]
852@mcp.prompt("crackerjack-patterns")
853async def get_crackerjack_patterns_prompt() -> str:
854 """Analyze test failure patterns and trends for debugging insights."""
855 return SESSION_COMMANDS["crackerjack-patterns"]
858# Global instances
859permissions_manager = SessionPermissionsManager(claude_dir)
860current_project = None
862# New global instances for multi-project and advanced search
863multi_project_coordinator: Any | None = None
864advanced_search_engine: Any | None = None
865app_config: Any | None = None
868async def initialize_new_features() -> None:
869 """Initialize multi-project coordination and advanced search features."""
870 global multi_project_coordinator, advanced_search_engine, app_config
872 # Load configuration
873 if CONFIG_AVAILABLE:
874 app_config = get_config()
876 # Initialize reflection database for new features
877 if REFLECTION_TOOLS_AVAILABLE:
878 try:
879 db = await get_reflection_database()
881 # Initialize multi-project coordinator
882 if MULTI_PROJECT_AVAILABLE:
883 multi_project_coordinator = MultiProjectCoordinator(db)
885 # Initialize advanced search engine
886 if ADVANCED_SEARCH_AVAILABLE:
887 advanced_search_engine = AdvancedSearchEngine(db)
889 except Exception:
890 # Silently handle optional feature initialization failures
891 pass
894def validate_claude_directory() -> dict[str, Any]:
895 """Simple validation of ~/.claude directory structure."""
896 validation_result = {"valid": True, "component_status": {}}
898 # Ensure basic ~/.claude directory structure exists
899 required_dirs = [claude_dir, claude_dir / "data", claude_dir / "logs"]
901 for dir_path in required_dirs:
902 if not dir_path.exists():
903 dir_path.mkdir(parents=True, exist_ok=True)
904 validation_result["component_status"][dir_path.name] = "✅ Created"
905 else:
906 validation_result["component_status"][dir_path.name] = "✅ Present"
908 return validation_result
911async def analyze_project_context(project_dir: Path) -> dict[str, bool]:
912 """Analyze project structure and context with enhanced error handling."""
913 try:
914 # Ensure project_dir exists and is accessible
915 if not project_dir.exists(): 915 ↛ 916line 915 didn't jump to line 916 because the condition on line 915 was never true
916 return {
917 "python_project": False,
918 "git_repo": False,
919 "has_tests": False,
920 "has_docs": False,
921 "has_requirements": False,
922 "has_uv_lock": False,
923 "has_mcp_config": False,
924 }
926 return {
927 "python_project": (project_dir / "pyproject.toml").exists(),
928 "git_repo": (project_dir / ".git").exists(),
929 "has_tests": any(project_dir.glob("test*"))
930 or any(project_dir.glob("**/test*")),
931 "has_docs": (project_dir / "README.md").exists()
932 or any(project_dir.glob("docs/**")),
933 "has_requirements": (project_dir / "requirements.txt").exists(),
934 "has_uv_lock": (project_dir / "uv.lock").exists(),
935 "has_mcp_config": (project_dir / ".mcp.json").exists(),
936 }
937 except (OSError, PermissionError) as e:
938 # Log error but return safe defaults
939 print(
940 f"Warning: Could not analyze project context for {project_dir}: {e}",
941 file=sys.stderr,
942 )
943 return {
944 "python_project": False,
945 "git_repo": False,
946 "has_tests": False,
947 "has_docs": False,
948 "has_requirements": False,
949 "has_uv_lock": False,
950 "has_mcp_config": False,
951 }
954def _setup_claude_directory(output: list[str]) -> dict:
955 """Setup Claude directory and return validation results."""
956 output.append("\n📋 Phase 1: Claude directory setup...")
958 claude_validation = validate_claude_directory()
959 output.append("✅ Claude directory structure ready")
961 # Show component status
962 for component, status in claude_validation["component_status"].items():
963 output.append(f" {status} {component}")
965 return claude_validation
968def _setup_uv_dependencies(output: list[str], current_dir: Path) -> None:
969 """Setup UV dependencies and package management."""
970 output.append("\n🔧 Phase 2: UV dependency management & session setup...")
972 uv_available = shutil.which("uv") is not None
973 output.append(
974 f"🔍 UV package manager: {'✅ AVAILABLE' if uv_available else '❌ NOT FOUND'}",
975 )
977 # Check UV permissions
978 uv_trusted = permissions_manager.is_operation_trusted(
979 permissions_manager.TRUSTED_UV_OPERATIONS,
980 )
981 if uv_trusted:
982 output.append("🔐 UV operations: ✅ TRUSTED (no prompts needed)")
983 else:
984 output.append("🔐 UV operations: ⚠️ Will require permission prompts")
986 if not uv_available:
987 output.append("💡 Install UV: curl -LsSf https://astral.sh/uv/install.sh | sh")
988 return
990 _handle_uv_operations(output, current_dir, uv_trusted)
993def _handle_uv_operations(
994 output: list[str],
995 current_dir: Path,
996 uv_trusted: bool,
997) -> None:
998 """Handle UV operations for dependency management."""
999 project_has_pyproject = (current_dir / "pyproject.toml").exists()
1001 if not project_has_pyproject:
1002 output.append("⚠️ No pyproject.toml found - skipping UV operations")
1003 output.append("💡 Create pyproject.toml to enable UV dependency management")
1004 return
1006 original_cwd = Path.cwd()
1007 try:
1008 os.chdir(current_dir)
1009 output.append(f"📁 Working in: {current_dir}")
1011 # Trust UV operations if first successful run
1012 if not uv_trusted:
1013 output.append("🔓 Trusting UV operations for this session...")
1014 permissions_manager.trust_operation(
1015 permissions_manager.TRUSTED_UV_OPERATIONS,
1016 "UV package management operations",
1017 )
1018 output.append("✅ UV operations now trusted - no more prompts needed")
1020 _run_uv_sync_and_compile(output, current_dir)
1022 except Exception as e:
1023 output.append(f"⚠️ UV operation error: {e}")
1024 finally:
1025 os.chdir(original_cwd)
1028def _run_uv_sync_and_compile(output: list[str], current_dir: Path) -> None:
1029 """Run UV sync and compile operations."""
1030 # Sync dependencies
1031 sync_result = subprocess.run(
1032 ["uv", "sync"], check=False, capture_output=True, text=True
1033 )
1034 if sync_result.returncode == 0:
1035 output.append("✅ UV sync completed successfully")
1037 # Generate requirements.txt if needed
1038 if not (current_dir / "requirements.txt").exists():
1039 compile_result = subprocess.run(
1040 [
1041 "uv",
1042 "pip",
1043 "compile",
1044 "pyproject.toml",
1045 "--output-file",
1046 "requirements.txt",
1047 ],
1048 check=False,
1049 capture_output=True,
1050 text=True,
1051 )
1052 if compile_result.returncode == 0:
1053 output.append("✅ Requirements.txt generated from UV dependencies")
1054 else:
1055 output.append(
1056 f"⚠️ Requirements compilation warning: {compile_result.stderr}",
1057 )
1058 else:
1059 output.append("✅ Requirements.txt already exists")
1060 else:
1061 output.append(f"⚠️ UV sync issues: {sync_result.stderr}")
1064def _setup_session_management(output: list[str]) -> None:
1065 """Setup session management functionality."""
1066 output.append("\n🔧 Phase 3: Session management setup...")
1067 output.append("✅ Session management functionality ready")
1068 output.append(" 📊 Conversation memory system enabled")
1069 output.append(" 🔍 Semantic search capabilities available")
1071 output.append("\n🧠 Phase 4: Integrated MCP services initialization...")
1072 output.append("\n📊 Integrated MCP Services Status:")
1073 output.append("✅ Session Management - Active (conversation memory enabled)")
1076async def _analyze_project_structure(
1077 output: list[str],
1078 current_dir: Path,
1079 current_project: str,
1080) -> tuple[dict, int]:
1081 """Analyze project structure and add information to output."""
1082 output.append(f"\n🎯 Phase 5: Project context analysis for {current_project}...")
1084 project_context = await analyze_project_context(current_dir)
1085 context_score = sum(1 for detected in project_context.values() if detected)
1087 output.append("🔍 Project structure analysis:")
1088 for context_type, detected in project_context.items():
1089 status = "✅" if detected else "➖"
1090 output.append(f" {status} {context_type.replace('_', ' ').title()}")
1092 output.append(
1093 f"\n📊 Project maturity: {context_score}/{len(project_context)} indicators",
1094 )
1095 if context_score >= len(project_context) - 1:
1096 output.append("🌟 Excellent project structure - well-organized codebase")
1097 elif context_score >= len(project_context) // 2:
1098 output.append("👍 Good project structure - solid foundation")
1099 else:
1100 output.append("💡 Basic project - consider adding structure")
1102 return project_context, context_score
1105def _add_final_summary(
1106 output: list[str],
1107 current_project: str,
1108 context_score: int,
1109 project_context: dict,
1110 claude_validation: dict,
1111) -> None:
1112 """Add final summary information to output."""
1113 output.append("\n" + "=" * 60)
1114 output.append(f"🎯 {current_project.upper()} SESSION INITIALIZATION COMPLETE")
1115 output.append("=" * 60)
1117 output.append(f"📅 Initialized: {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}")
1118 output.append(f"🗂️ Project: {current_project}")
1119 output.append(f"📊 Structure score: {context_score}/{len(project_context)}")
1121 missing_files = claude_validation.get("missing_files", [])
1122 if context_score >= len(project_context) // 2 and not missing_files:
1123 output.append("✅ Ready for productive session - all systems optimal")
1124 else:
1125 output.append("⚠️ Session ready with minor setup opportunities identified")
1127 _add_permissions_and_tools_summary(output, current_project)
1130def _add_permissions_and_tools_summary(output: list[str], current_project: str) -> None:
1131 """Add permissions summary and available tools."""
1132 # Permissions Summary
1133 permissions_status = permissions_manager.get_permission_status()
1134 output.append("\n🔐 Session Permissions Summary:")
1135 output.append(
1136 f" 📊 Trusted operations: {permissions_status['trusted_operations_count']}",
1137 )
1139 if permissions_status["trusted_operations_count"] > 0:
1140 output.append(" ✅ Future operations will have reduced permission prompts")
1141 else:
1142 output.append(" 💡 Operations will be trusted automatically on first use")
1144 # Server Detection and Guidance
1145 detected_servers = _detect_other_mcp_servers()
1146 server_guidance = _generate_server_guidance(detected_servers)
1148 if server_guidance:
1149 output.append("\n" + "\n".join(server_guidance))
1151 output.append("\n📋 AVAILABLE MCP TOOLS:")
1152 output.append("📊 Session Management:")
1153 output.append("• checkpoint - Mid-session quality assessment")
1154 output.append("• end - Complete session cleanup")
1155 output.append("• status - Current session status")
1156 output.append("• permissions - Manage trusted operations")
1157 output.append("• Built-in conversation memory with semantic search")
1159 output.append(f"\n✨ {current_project} session initialization complete via MCP!")
1162@mcp.tool()
1163async def init(working_directory: str | None = None) -> str:
1164 """Initialize Claude session with comprehensive setup including UV dependencies and automation tools.
1166 Args:
1167 working_directory: Optional working directory override (defaults to PWD environment variable or current directory)
1169 """
1170 output = []
1171 output.append("🚀 Claude Session Initialization via MCP Server")
1172 output.append("=" * 60)
1174 # Detect current project - use parameter, environment, or fallback to cwd
1175 if working_directory:
1176 current_dir = Path(working_directory)
1177 else:
1178 current_dir = Path(os.environ.get("PWD", Path.cwd()))
1180 global current_project
1181 current_project = current_dir.name
1182 output.append(f"📁 Current project: {current_project}")
1184 # Run initialization phases
1185 claude_validation = _setup_claude_directory(output)
1186 _setup_uv_dependencies(output, current_dir)
1187 _setup_session_management(output)
1188 project_context, context_score = await _analyze_project_structure(
1189 output,
1190 current_dir,
1191 current_project,
1192 )
1193 _add_final_summary(
1194 output,
1195 current_project,
1196 context_score,
1197 project_context,
1198 claude_validation,
1199 )
1201 return "\n".join(output)
1204async def calculate_quality_score() -> dict[str, Any]:
1205 """Calculate session quality score based on multiple factors."""
1206 current_dir = Path(os.environ.get("PWD", Path.cwd()))
1208 # Project health indicators (40% of score)
1209 project_context = await analyze_project_context(current_dir)
1210 project_score = (
1211 sum(1 for detected in project_context.values() if detected)
1212 / len(project_context)
1213 ) * 40
1215 # Permissions health (20% of score)
1216 permissions_count = len(permissions_manager.trusted_operations)
1217 permissions_score = min(
1218 permissions_count * 5,
1219 20,
1220 ) # Up to 4 trusted operations = max score
1222 # Session management availability (20% of score)
1223 session_score = 20 if SESSION_MANAGEMENT_AVAILABLE else 5
1225 # Tool availability (20% of score)
1226 uv_available = shutil.which("uv") is not None
1227 tool_score = 20 if uv_available else 10
1229 total_score = int(project_score + permissions_score + session_score + tool_score)
1231 return {
1232 "total_score": total_score,
1233 "breakdown": {
1234 "project_health": project_score,
1235 "permissions": permissions_score,
1236 "session_management": session_score,
1237 "tools": tool_score,
1238 },
1239 "recommendations": _generate_quality_recommendations(
1240 total_score,
1241 project_context,
1242 permissions_count,
1243 uv_available,
1244 ),
1245 }
1248def _generate_quality_recommendations(
1249 score: int,
1250 project_context: dict,
1251 permissions_count: int,
1252 uv_available: bool,
1253) -> list[str]:
1254 """Generate quality improvement recommendations based on score factors."""
1255 recommendations = []
1257 if score < 50:
1258 recommendations.append(
1259 "Session needs attention - multiple areas for improvement",
1260 )
1261 elif score < 75:
1262 recommendations.append("Good session health - minor optimizations available")
1263 else:
1264 recommendations.append("Excellent session quality - maintain current practices")
1266 # Project-specific recommendations
1267 if not project_context.get("has_tests"):
1268 recommendations.append("Consider adding tests to improve project structure")
1269 if not project_context.get("has_docs"):
1270 recommendations.append("Documentation would enhance project maturity")
1272 # Permissions recommendations
1273 if permissions_count == 0:
1274 recommendations.append(
1275 "No trusted operations yet - permissions will be granted on first use",
1276 )
1277 elif permissions_count > 5:
1278 recommendations.append(
1279 "Many trusted operations - consider reviewing for security",
1280 )
1282 # Tools recommendations
1283 if not uv_available:
1284 recommendations.append(
1285 "Install UV package manager for better dependency management",
1286 )
1288 return recommendations
1291def should_suggest_compact() -> tuple[bool, str]:
1292 """Determine if compacting would be beneficial and provide reasoning.
1293 Returns (should_compact, reason).
1294 """
1295 # Heuristics for when compaction might be needed:
1296 # 1. Large projects with many files
1297 # 2. Active development (recent git activity)
1298 # 3. Complex task sequences
1299 # 4. Session duration indicators
1301 try:
1302 current_dir = Path(os.environ.get("PWD", Path.cwd()))
1304 # Count significant files in project as a complexity indicator
1305 file_count = 0
1306 for file_path in current_dir.rglob("*"): 1306 ↛ 1330line 1306 didn't jump to line 1330 because the loop on line 1306 didn't complete
1307 if (
1308 file_path.is_file()
1309 and not any(part.startswith(".") for part in file_path.parts)
1310 and file_path.suffix
1311 in {
1312 ".py",
1313 ".js",
1314 ".ts",
1315 ".jsx",
1316 ".tsx",
1317 ".go",
1318 ".rs",
1319 ".java",
1320 ".cpp",
1321 ".c",
1322 ".h",
1323 }
1324 ):
1325 file_count += 1
1326 if file_count > 50: # Stop counting after threshold
1327 break
1329 # Large project heuristic
1330 if file_count > 50: 1330 ↛ 1337line 1330 didn't jump to line 1337 because the condition on line 1330 was always true
1331 return (
1332 True,
1333 "Large codebase with 50+ source files detected - context compaction recommended",
1334 )
1336 # Check for active development via git
1337 git_dir = current_dir / ".git"
1338 if git_dir.exists():
1339 try:
1340 # Check number of recent commits as activity indicator
1341 result = subprocess.run(
1342 ["git", "log", "--oneline", "-20", "--since='24 hours ago'"],
1343 check=False,
1344 capture_output=True,
1345 text=True,
1346 cwd=current_dir,
1347 timeout=5,
1348 )
1349 if result.returncode == 0:
1350 recent_commits = len(
1351 [
1352 line
1353 for line in result.stdout.strip().split("\n")
1354 if line.strip()
1355 ],
1356 )
1357 if recent_commits >= 3:
1358 return (
1359 True,
1360 f"High development activity ({recent_commits} commits in 24h) - compaction recommended",
1361 )
1363 # Check for large number of modified files
1364 status_result = subprocess.run(
1365 ["git", "status", "--porcelain"],
1366 check=False,
1367 capture_output=True,
1368 text=True,
1369 cwd=current_dir,
1370 timeout=5,
1371 )
1372 if status_result.returncode == 0:
1373 modified_files = len(
1374 [
1375 line
1376 for line in status_result.stdout.strip().split("\n")
1377 if line.strip()
1378 ],
1379 )
1380 if modified_files >= 10:
1381 return (
1382 True,
1383 f"Many modified files ({modified_files}) detected - context optimization beneficial",
1384 )
1386 except (subprocess.TimeoutExpired, Exception):
1387 pass
1389 # Check for common patterns suggesting complex session
1390 if (current_dir / "tests").exists() and (
1391 current_dir / "pyproject.toml"
1392 ).exists():
1393 return (
1394 True,
1395 "Python project with tests detected - compaction may improve focus",
1396 )
1398 # Default to not suggesting unless we have clear indicators
1399 return False, "Context appears manageable - compaction not immediately needed"
1401 except Exception:
1402 # If we can't determine, err on the side of suggesting compaction for safety
1403 return (
1404 True,
1405 "Unable to assess context complexity - compaction may be beneficial as a precaution",
1406 )
1409async def _optimize_reflection_database() -> str:
1410 """Optimize the reflection database."""
1411 try:
1412 from .reflection_tools import get_reflection_database
1414 db = await get_reflection_database()
1415 await db.get_stats()
1416 db_size_before = (
1417 Path(db.db_path).stat().st_size if Path(db.db_path).exists() else 0
1418 )
1420 if db.conn: 1420 ↛ 1421line 1420 didn't jump to line 1421 because the condition on line 1420 was never true
1421 await asyncio.get_event_loop().run_in_executor(
1422 None,
1423 lambda: db.conn.execute("VACUUM"),
1424 )
1425 await asyncio.get_event_loop().run_in_executor(
1426 None,
1427 lambda: db.conn.execute("ANALYZE"),
1428 )
1430 db_size_after = (
1431 Path(db.db_path).stat().st_size if Path(db.db_path).exists() else 0
1432 )
1433 space_saved = db_size_before - db_size_after
1435 return f"🗄️ Database: {'Optimized reflection DB, saved ' + str(space_saved) + ' bytes' if space_saved > 0 else 'Reflection DB already optimized'}"
1437 except ImportError:
1438 return "ℹ️ Database: Reflection tools not available"
1439 except Exception as e:
1440 return f"⚠️ Database: Optimization skipped - {str(e)[:50]}"
1443def _cleanup_session_logs() -> str:
1444 """Clean up old session log files."""
1445 try:
1446 log_dir = claude_dir / "logs"
1447 if not log_dir.exists(): 1447 ↛ 1448line 1447 didn't jump to line 1448 because the condition on line 1447 was never true
1448 return "📝 Logs: No log directory found"
1450 log_files = sorted(
1451 log_dir.glob("session_management_*.log"),
1452 key=lambda x: x.stat().st_mtime,
1453 )
1455 if len(log_files) <= 10: 1455 ↛ 1458line 1455 didn't jump to line 1458 because the condition on line 1455 was always true
1456 return "📝 Logs: Within retention limit (10 files)"
1458 old_files = log_files[:-10]
1459 total_size = sum(f.stat().st_size for f in old_files)
1461 for old_file in old_files:
1462 old_file.unlink()
1464 return (
1465 f"📝 Logs: Cleaned {len(old_files)} old files, freed {total_size:,} bytes"
1466 )
1468 except Exception as e:
1469 return f"⚠️ Logs: Cleanup failed - {str(e)[:50]}"
1472def _cleanup_temp_files(current_dir: Path) -> str:
1473 """Clean up temporary files in the project directory."""
1474 try:
1475 temp_cleaned = 0
1476 temp_size = 0
1478 # Clean Python cache files
1479 for cache_file in current_dir.rglob("__pycache__"):
1480 if cache_file.is_dir(): 1480 ↛ 1479line 1480 didn't jump to line 1479 because the condition on line 1480 was always true
1481 for file in cache_file.iterdir():
1482 temp_size += file.stat().st_size
1483 file.unlink()
1484 cache_file.rmdir()
1485 temp_cleaned += 1
1487 # Clean .DS_Store files on macOS
1488 for ds_file in current_dir.rglob(".DS_Store"): 1488 ↛ 1489line 1488 didn't jump to line 1489 because the loop on line 1488 never started
1489 temp_size += ds_file.stat().st_size
1490 ds_file.unlink()
1491 temp_cleaned += 1
1493 # Clean pytest cache
1494 pytest_cache = current_dir / ".pytest_cache"
1495 if pytest_cache.exists(): 1495 ↛ 1496line 1495 didn't jump to line 1496 because the condition on line 1495 was never true
1496 shutil.rmtree(pytest_cache)
1497 temp_cleaned += 1
1499 # Clean coverage files (keep most recent 3)
1500 coverage_files = sorted(
1501 current_dir.glob(".coverage*"),
1502 key=lambda x: x.stat().st_mtime,
1503 )
1504 if len(coverage_files) > 3: 1504 ↛ 1505line 1504 didn't jump to line 1505 because the condition on line 1504 was never true
1505 for cov_file in coverage_files[:-3]:
1506 temp_size += cov_file.stat().st_size
1507 cov_file.unlink()
1508 temp_cleaned += 1
1510 return f"🧹 Temp files: {'Cleaned ' + str(temp_cleaned) + ' items, freed ' + str(temp_size) + ' bytes' if temp_cleaned > 0 else 'No cleanup needed'}"
1512 except Exception as e:
1513 return f"⚠️ Temp files: Cleanup failed - {str(e)[:50]}"
1516def _optimize_git_repository(current_dir: Path) -> list[str]:
1517 """Optimize git repository."""
1518 results = []
1520 try:
1521 if not (current_dir / ".git").exists(): 1521 ↛ 1522line 1521 didn't jump to line 1522 because the condition on line 1521 was never true
1522 return ["📦 Git: Not a git repository"]
1524 # Run git garbage collection
1525 gc_result = subprocess.run(
1526 ["git", "gc", "--auto"],
1527 check=False,
1528 cwd=current_dir,
1529 capture_output=True,
1530 text=True,
1531 timeout=30,
1532 )
1534 if gc_result.returncode != 0: 1534 ↛ 1535line 1534 didn't jump to line 1535 because the condition on line 1534 was never true
1535 return ["📦 Git: Optimization skipped (no changes)"]
1537 # Check repository size
1538 git_dir = current_dir / ".git"
1539 git_size = sum(f.stat().st_size for f in git_dir.rglob("*") if f.is_file())
1540 results.append(f"📦 Git: Repository optimized ({git_size:,} bytes)")
1542 # Prune remote tracking branches
1543 prune_result = subprocess.run(
1544 ["git", "remote", "prune", "origin"],
1545 check=False,
1546 cwd=current_dir,
1547 capture_output=True,
1548 text=True,
1549 timeout=15,
1550 )
1552 if prune_result.returncode == 0 and prune_result.stdout.strip(): 1552 ↛ 1553line 1552 didn't jump to line 1553 because the condition on line 1552 was never true
1553 results.append("🌿 Git: Pruned stale remote branches")
1555 return results
1557 except subprocess.TimeoutExpired:
1558 return ["⏱️ Git: Optimization timeout (large repository)"]
1559 except Exception as e:
1560 return [f"⚠️ Git: Optimization failed - {str(e)[:50]}"]
1563def _cleanup_uv_cache() -> str:
1564 """Clean up UV package cache."""
1565 try:
1566 if not shutil.which("uv"): 1566 ↛ 1567line 1566 didn't jump to line 1567 because the condition on line 1566 was never true
1567 return "📦 UV: Not available"
1569 cache_result = subprocess.run(
1570 ["uv", "cache", "clean"],
1571 check=False,
1572 capture_output=True,
1573 text=True,
1574 timeout=30,
1575 )
1577 return (
1578 "📦 UV: Package cache cleaned"
1579 if cache_result.returncode == 0
1580 else "📦 UV: Cache already clean"
1581 )
1583 except subprocess.TimeoutExpired:
1584 return "⏱️ UV: Cache cleanup timeout"
1585 except Exception as e:
1586 return f"⚠️ UV: Cache cleanup failed - {str(e)[:50]}"
1589async def _analyze_context_compaction() -> list[str]:
1590 """Analyze and recommend context compaction."""
1591 results = []
1593 try:
1594 should_compact, reason = should_suggest_compact()
1595 results.append("\n🔍 Context Compaction Analysis")
1596 results.append(f"📊 {reason}")
1598 if should_compact: 1598 ↛ 1614line 1598 didn't jump to line 1614 because the condition on line 1598 was always true
1599 results.extend(
1600 [
1601 "",
1602 "🔄 RECOMMENDATION: Run /compact to optimize context",
1603 "📝 Benefits of compaction:",
1604 " • Improved response speed and accuracy",
1605 " • Better focus on current development context",
1606 " • Reduced memory usage for complex sessions",
1607 " • Cleaner conversation flow",
1608 "",
1609 "💡 WORKFLOW: After this checkpoint completes, run: /compact",
1610 "🔄 Context compaction should be applied for optimal performance",
1611 ],
1612 )
1613 else:
1614 results.append("✅ Context appears well-optimized for current session")
1616 results.append(
1617 "💡 This checkpoint includes intelligent conversation summarization",
1618 )
1620 # Store conversation summary
1621 conversation_summary = await summarize_current_conversation()
1623 if conversation_summary["key_topics"]: 1623 ↛ 1629line 1623 didn't jump to line 1629 because the condition on line 1623 was always true
1624 key_topics_summary = (
1625 f"Session focus: {', '.join(conversation_summary['key_topics'][:3])}"
1626 )
1627 results.append(f"📋 {key_topics_summary}")
1629 if conversation_summary["decisions_made"]: 1629 ↛ 1634line 1629 didn't jump to line 1634 because the condition on line 1629 was always true
1630 key_decision = conversation_summary["decisions_made"][0]
1631 results.append(f"✅ Key decision: {key_decision}")
1633 # Store context summary for post-compaction retrieval
1634 await _store_context_summary(conversation_summary)
1635 results.append("💾 Context summary stored for post-compaction retrieval")
1637 except Exception as e:
1638 results.append(f"⚠️ Context summary storage failed: {str(e)[:50]}")
1640 return results
1643async def _store_context_summary(conversation_summary: dict) -> None:
1644 """Store comprehensive context summary."""
1645 try:
1646 db = await get_reflection_database()
1647 context_summary = f"Pre-compaction context summary: {', '.join(conversation_summary['key_topics'])}. "
1648 context_summary += (
1649 f"Decisions: {', '.join(conversation_summary['decisions_made'])}. "
1650 )
1651 context_summary += (
1652 f"Next steps: {', '.join(conversation_summary['next_steps'])}"
1653 )
1655 await db.store_reflection(
1656 context_summary,
1657 [
1658 "pre-compaction",
1659 "context-summary",
1660 "checkpoint",
1661 current_project or "unknown-project",
1662 ],
1663 )
1664 except Exception as e:
1665 msg = f"Context summary storage failed: {str(e)[:50]}"
1666 raise Exception(msg)
1669async def perform_strategic_compaction() -> list[str]:
1670 """Perform strategic compaction and optimization tasks."""
1671 results = []
1672 current_dir = Path(os.environ.get("PWD", Path.cwd()))
1674 # Database optimization
1675 results.append(await _optimize_reflection_database())
1677 # Log cleanup
1678 results.append(_cleanup_session_logs())
1680 # Temp file cleanup
1681 results.append(_cleanup_temp_files(current_dir))
1683 # Git optimization
1684 results.extend(_optimize_git_repository(current_dir))
1686 # UV cache cleanup
1687 results.append(_cleanup_uv_cache())
1689 # Context compaction analysis
1690 results.extend(await _analyze_context_compaction())
1692 # Summary
1693 total_operations = len([r for r in results if not r.startswith(("ℹ️", "⚠️", "⏱️"))])
1694 results.extend(
1695 [
1696 f"\n📊 Strategic compaction complete: {total_operations} optimization tasks performed",
1697 "🎯 Recommendation: Conversation context should be compacted automatically",
1698 ],
1699 )
1701 return results
1704async def _generate_basic_insights(
1705 quality_score: float,
1706 conversation_summary: dict[str, Any],
1707) -> list[str]:
1708 """Generate basic session insights from quality score and conversation summary."""
1709 insights = []
1711 insights.append(
1712 f"Session checkpoint completed with quality score: {quality_score}/100",
1713 )
1715 if conversation_summary["key_topics"]: 1715 ↛ 1720line 1715 didn't jump to line 1720 because the condition on line 1715 was always true
1716 insights.append(
1717 f"Key discussion topics: {', '.join(conversation_summary['key_topics'][:3])}",
1718 )
1720 if conversation_summary["decisions_made"]: 1720 ↛ 1725line 1720 didn't jump to line 1725 because the condition on line 1720 was always true
1721 insights.append(
1722 f"Important decisions: {conversation_summary['decisions_made'][0]}",
1723 )
1725 if conversation_summary["next_steps"]: 1725 ↛ 1730line 1725 didn't jump to line 1730 because the condition on line 1725 was always true
1726 insights.append(
1727 f"Next steps identified: {conversation_summary['next_steps'][0]}",
1728 )
1730 return insights
1733async def _add_project_context_insights(insights: list[str]) -> None:
1734 """Add project context analysis to insights."""
1735 current_dir = Path(os.environ.get("PWD", Path.cwd()))
1736 project_context = await analyze_project_context(current_dir)
1737 context_items = [k for k, v in project_context.items() if v]
1738 if context_items: 1738 ↛ exitline 1738 didn't return from function '_add_project_context_insights' because the condition on line 1738 was always true
1739 insights.append(f"Active project context: {', '.join(context_items)}")
1742def _add_session_health_insights(insights: list[str], quality_score: float) -> None:
1743 """Add session health indicators to insights."""
1744 if quality_score >= 80: 1744 ↛ 1746line 1744 didn't jump to line 1746 because the condition on line 1744 was always true
1745 insights.append("Excellent session progress with optimal workflow patterns")
1746 elif quality_score >= 60:
1747 insights.append("Good session progress with minor optimization opportunities")
1748 else:
1749 insights.append(
1750 "Session requires attention - potential workflow improvements needed",
1751 )
1754def _generate_session_tags(quality_score: float) -> list[str]:
1755 """Generate contextual tags for session reflection storage."""
1756 tags = ["checkpoint", "session-summary", current_project or "unknown-project"]
1757 if quality_score >= 80: 1757 ↛ 1759line 1757 didn't jump to line 1759 because the condition on line 1757 was always true
1758 tags.append("excellent-session")
1759 elif quality_score < 60:
1760 tags.append("needs-attention")
1761 return tags
1764async def _capture_flow_analysis(db, tags: list[str], results: list[str]) -> None:
1765 """Capture conversation flow insights."""
1766 flow_analysis = await analyze_conversation_flow()
1767 flow_summary = f"Session pattern: {flow_analysis['pattern_type']}. "
1768 if flow_analysis["recommendations"]:
1769 flow_summary += f"Key recommendation: {flow_analysis['recommendations'][0]}"
1771 flow_id = await db.store_reflection(
1772 flow_summary,
1773 [*tags, "flow-analysis", "phase3"],
1774 )
1775 results.append(f"🔄 Flow analysis stored: {flow_id[:12]}...")
1778async def _capture_intelligence_insights(
1779 db,
1780 tags: list[str],
1781 results: list[str],
1782) -> None:
1783 """Capture session intelligence insights."""
1784 intelligence = await generate_session_intelligence()
1785 if intelligence["priority_actions"]:
1786 intel_summary = f"Session intelligence: {intelligence['intelligence_level']}. "
1787 intel_summary += f"Priority: {intelligence['priority_actions'][0]}"
1789 intel_id = await db.store_reflection(
1790 intel_summary,
1791 [*tags, "intelligence", "proactive"],
1792 )
1793 results.append(f"🧠 Intelligence insights stored: {intel_id[:12]}...")
1796async def _capture_session_metrics(db, tags: list[str], results: list[str]) -> None:
1797 """Capture additional session metrics if available."""
1798 if SESSION_MANAGEMENT_AVAILABLE:
1799 try:
1800 checkpoint_result = checkpoint_session()
1801 session_stats = checkpoint_result.get("session_stats", {})
1802 if session_stats:
1803 detail_summary = f"Session metrics - Duration: {session_stats.get('duration_minutes', 0)}min, "
1804 detail_summary += (
1805 f"Success rate: {session_stats.get('success_rate', 0):.1f}%, "
1806 )
1807 detail_summary += (
1808 f"Checkpoints: {session_stats.get('total_checkpoints', 0)}"
1809 )
1811 detail_id = await db.store_reflection(
1812 detail_summary,
1813 [*tags, "session-metrics"],
1814 )
1815 results.append(f"📊 Session metrics stored: {detail_id[:12]}...")
1816 except Exception as e:
1817 results.append(f"⚠️ Session metrics capture failed: {str(e)[:50]}...")
1820async def capture_session_insights(quality_score: float) -> list[str]:
1821 """Phase 1 & 3: Automatically capture and store session insights with conversation summarization."""
1822 results = []
1824 if not REFLECTION_TOOLS_AVAILABLE: 1824 ↛ 1825line 1824 didn't jump to line 1825 because the condition on line 1824 was never true
1825 results.append(
1826 "⚠️ Reflection storage not available - install dependencies: pip install duckdb transformers",
1827 )
1828 return results
1830 try:
1831 # Phase 3: AI-Powered Conversation Summarization
1832 conversation_summary = await summarize_current_conversation()
1834 # Generate comprehensive session summary
1835 insights = await _generate_basic_insights(quality_score, conversation_summary)
1836 await _add_project_context_insights(insights)
1837 _add_session_health_insights(insights, quality_score)
1839 # Store main session reflection
1840 session_summary = ". ".join(insights)
1841 tags = _generate_session_tags(quality_score)
1843 db = await get_reflection_database()
1844 reflection_id = await db.store_reflection(session_summary, tags)
1846 results.append("✅ Session insights automatically captured and stored")
1847 results.append(f"🆔 Reflection ID: {reflection_id[:12]}...")
1848 results.append(f"📝 Summary: {session_summary[:80]}...")
1849 results.append(f"🏷️ Tags: {', '.join(tags)}")
1851 # Phase 3A: Enhanced insight capture with advanced intelligence
1852 try:
1853 await _capture_flow_analysis(db, tags, results)
1854 await _capture_intelligence_insights(db, tags, results)
1855 except Exception as e:
1856 results.append(f"⚠️ Phase 3 insight capture failed: {str(e)[:50]}...")
1858 # Store additional detailed context
1859 await _capture_session_metrics(db, tags, results)
1861 except Exception as e:
1862 results.append(f"❌ Insight capture failed: {str(e)[:60]}...")
1863 results.append(
1864 "💡 Manual reflection storage still available via store_reflection tool",
1865 )
1867 return results
1870def _create_empty_summary() -> dict[str, Any]:
1871 """Create empty conversation summary structure."""
1872 return {
1873 "key_topics": [],
1874 "decisions_made": [],
1875 "next_steps": [],
1876 "problems_solved": [],
1877 "code_changes": [],
1878 }
1881def _extract_topics_from_content(content: str) -> set[str]:
1882 """Extract topics from reflection content."""
1883 topics = set()
1884 if "project context:" in content:
1885 context_part = content.split("project context:")[1].split(".")[0]
1886 topics.update(word.strip() for word in context_part.split(","))
1887 return topics
1890def _extract_decisions_from_content(content: str) -> list[str]:
1891 """Extract decisions from reflection content."""
1892 decisions = []
1893 if "excellent" in content:
1894 decisions.append("Maintaining productive workflow patterns")
1895 elif "attention" in content:
1896 decisions.append("Identified areas needing workflow optimization")
1897 elif "good progress" in content:
1898 decisions.append("Steady development progress confirmed")
1899 return decisions
1902def _extract_next_steps_from_content(content: str) -> list[str]:
1903 """Extract next steps from reflection content."""
1904 next_steps = []
1905 if "priority:" in content:
1906 priority_part = content.split("priority:")[1].split(".")[0]
1907 if priority_part.strip():
1908 next_steps.append(priority_part.strip())
1909 return next_steps
1912async def _process_recent_reflections(db, summary: dict[str, Any]) -> None:
1913 """Process recent reflections to extract conversation insights."""
1914 recent_reflections = await db.search_reflections("checkpoint", limit=5)
1916 if not recent_reflections:
1917 return
1919 topics = set()
1920 decisions = []
1921 next_steps = []
1923 for reflection in recent_reflections:
1924 content = reflection["content"].lower()
1926 topics.update(_extract_topics_from_content(content))
1927 decisions.extend(_extract_decisions_from_content(content))
1928 next_steps.extend(_extract_next_steps_from_content(content))
1930 summary["key_topics"] = list(topics)[:5]
1931 summary["decisions_made"] = decisions[:3]
1932 summary["next_steps"] = next_steps[:3]
1935def _add_current_session_context(summary: dict[str, Any]) -> None:
1936 """Add current session context to summary."""
1937 current_dir = Path(os.environ.get("PWD", Path.cwd()))
1938 if (current_dir / "session_mgmt_mcp").exists():
1939 summary["key_topics"].append("session-mgmt-mcp development")
1942def _ensure_summary_defaults(summary: dict[str, Any]) -> None:
1943 """Ensure summary has default values if empty."""
1944 if not summary["key_topics"]:
1945 summary["key_topics"] = [
1946 "session management",
1947 "workflow optimization",
1948 ]
1950 if not summary["decisions_made"]:
1951 summary["decisions_made"] = ["Proceeding with current development approach"]
1953 if not summary["next_steps"]:
1954 summary["next_steps"] = ["Continue with regular checkpoint monitoring"]
1957def _get_fallback_summary() -> dict[str, Any]:
1958 """Get fallback summary when reflection processing fails."""
1959 return {
1960 "key_topics": ["development session", "workflow management"],
1961 "decisions_made": ["Maintaining current session approach"],
1962 "next_steps": ["Continue monitoring session quality"],
1963 "problems_solved": ["Session management optimization"],
1964 "code_changes": ["Enhanced checkpoint functionality"],
1965 }
1968def _get_error_summary(error: Exception) -> dict[str, Any]:
1969 """Get error summary when conversation analysis fails."""
1970 return {
1971 "key_topics": ["session analysis"],
1972 "decisions_made": ["Continue current workflow"],
1973 "next_steps": ["Regular quality monitoring"],
1974 "problems_solved": [],
1975 "code_changes": [],
1976 "error": str(error),
1977 }
1980async def summarize_current_conversation() -> dict[str, Any]:
1981 """Phase 3: AI-Powered Conversation Summarization."""
1982 try:
1983 summary = _create_empty_summary()
1985 if REFLECTION_TOOLS_AVAILABLE: 1985 ↛ 1994line 1985 didn't jump to line 1994 because the condition on line 1985 was always true
1986 try:
1987 db = await get_reflection_database()
1988 await _process_recent_reflections(db, summary)
1989 _add_current_session_context(summary)
1990 _ensure_summary_defaults(summary)
1991 except Exception:
1992 summary = _get_fallback_summary()
1994 return summary
1996 except Exception as e:
1997 return _get_error_summary(e)
2000def _extract_quality_scores(reflections: list) -> list[float]:
2001 """Extract quality scores from reflection content."""
2002 quality_scores = []
2004 for reflection in reflections:
2005 try:
2006 if "quality score:" in reflection["content"]:
2007 score_text = (
2008 reflection["content"].split("quality score:")[1].split("/")[0]
2009 )
2010 score = float(score_text.strip())
2011 quality_scores.append(score)
2012 except (ValueError, IndexError, AttributeError):
2013 continue
2015 return quality_scores
2018def _analyze_quality_trend(quality_scores: list[float]) -> tuple[str, list[str], bool]:
2019 """Analyze quality trend and generate alerts."""
2020 quality_alerts = []
2021 quality_trend = "stable"
2022 recommend_checkpoint = False
2024 if len(quality_scores) < 3:
2025 return quality_trend, quality_alerts, recommend_checkpoint
2027 # Trend analysis
2028 recent_avg = (
2029 sum(quality_scores[:2]) / 2 if len(quality_scores) >= 2 else quality_scores[0]
2030 )
2031 older_avg = sum(quality_scores[2:]) / len(quality_scores[2:])
2033 if recent_avg < older_avg - 10:
2034 quality_trend = "declining"
2035 quality_alerts.append("Quality trend declining - consider workflow review")
2036 recommend_checkpoint = True
2037 elif recent_avg > older_avg + 5:
2038 quality_trend = "improving"
2039 quality_alerts.append("Quality trend improving - maintain current patterns")
2041 # Early warning triggers
2042 if recent_avg < 50:
2043 quality_alerts.append("URGENT: Session quality critically low")
2044 recommend_checkpoint = True
2045 elif recent_avg < 70:
2046 quality_alerts.append("WARNING: Session quality below optimal")
2047 recommend_checkpoint = True
2049 return quality_trend, quality_alerts, recommend_checkpoint
2052def _check_workflow_drift(quality_scores: list[float]) -> tuple[list[str], bool]:
2053 """Check for workflow drift indicators."""
2054 quality_alerts = []
2055 recommend_checkpoint = False
2057 if len(quality_scores) >= 4:
2058 variance = max(quality_scores) - min(quality_scores)
2059 if variance > 30:
2060 quality_alerts.append(
2061 "High quality variance detected - workflow inconsistency",
2062 )
2063 recommend_checkpoint = True
2065 return quality_alerts, recommend_checkpoint
2068async def _perform_quality_analysis() -> tuple[str, list[str], bool]:
2069 """Perform quality analysis with reflection data."""
2070 quality_alerts = []
2071 quality_trend = "stable"
2072 recommend_checkpoint = False
2074 try:
2075 db = await get_reflection_database()
2076 recent_reflections = await db.search_reflections("quality score", limit=5)
2077 quality_scores = _extract_quality_scores(recent_reflections)
2079 if quality_scores:
2080 trend, trend_alerts, trend_checkpoint = _analyze_quality_trend(
2081 quality_scores,
2082 )
2083 quality_trend = trend
2084 quality_alerts.extend(trend_alerts)
2085 recommend_checkpoint = recommend_checkpoint or trend_checkpoint
2087 drift_alerts, drift_checkpoint = _check_workflow_drift(quality_scores)
2088 quality_alerts.extend(drift_alerts)
2089 recommend_checkpoint = recommend_checkpoint or drift_checkpoint
2091 except Exception:
2092 quality_alerts.append("Quality monitoring analysis unavailable")
2094 return quality_trend, quality_alerts, recommend_checkpoint
2097def _get_quality_error_result(error: Exception) -> dict[str, Any]:
2098 """Get error result for quality monitoring failure."""
2099 return {
2100 "quality_trend": "unknown",
2101 "alerts": ["Quality monitoring failed"],
2102 "recommend_checkpoint": False,
2103 "monitoring_active": False,
2104 "error": str(error),
2105 }
2108async def monitor_proactive_quality() -> dict[str, Any]:
2109 """Phase 3: Proactive Quality Monitoring with Early Warning System."""
2110 try:
2111 quality_alerts = []
2112 quality_trend = "stable"
2113 recommend_checkpoint = False
2115 if REFLECTION_TOOLS_AVAILABLE:
2116 (
2117 quality_trend,
2118 quality_alerts,
2119 recommend_checkpoint,
2120 ) = await _perform_quality_analysis()
2122 return {
2123 "quality_trend": quality_trend,
2124 "alerts": quality_alerts,
2125 "recommend_checkpoint": recommend_checkpoint,
2126 "monitoring_active": True,
2127 }
2129 except Exception as e:
2130 return _get_quality_error_result(e)
2133async def analyze_advanced_context_metrics() -> dict[str, Any]:
2134 """Phase 3A: Advanced context metrics analysis."""
2135 return {
2136 "estimated_tokens": 0, # Placeholder for actual token counting
2137 "context_density": "moderate",
2138 "conversation_depth": "active",
2139 }
2142async def analyze_token_usage_patterns() -> dict[str, Any]:
2143 """Phase 3A: Intelligent token usage analysis with smart triggers."""
2144 try:
2145 # Get conversation statistics from memory system
2146 conv_stats = {"total_conversations": 0, "recent_activity": "low"}
2148 if REFLECTION_TOOLS_AVAILABLE: 2148 ↛ 2160line 2148 didn't jump to line 2160 because the condition on line 2148 was always true
2149 try:
2150 db = await get_reflection_database()
2151 stats = await db.get_stats()
2152 conv_stats["total_conversations"] = stats.get("conversations_count", 0)
2153 except Exception:
2154 pass
2156 # Heuristic-based context analysis (approximation)
2157 # In a real implementation, this would hook into actual context metrics
2159 # Check session activity patterns
2160 datetime.now()
2162 # Estimate context usage based on activity
2163 estimated_length = "moderate"
2164 needs_attention = False
2165 recommend_compact = False
2166 recommend_clear = False
2168 # Smart triggers based on conversation patterns and critical context detection
2170 # PRIORITY: Always recommend compaction if we have significant stored content
2171 # This indicates a long conversation that needs compaction
2172 if conv_stats["total_conversations"] > 3: 2172 ↛ 2174line 2172 didn't jump to line 2174 because the condition on line 2172 was never true
2173 # Any significant conversation history indicates compaction needed
2174 estimated_length = "extensive"
2175 needs_attention = True
2176 recommend_compact = True
2178 if conv_stats["total_conversations"] > 10: 2178 ↛ 2180line 2178 didn't jump to line 2180 because the condition on line 2178 was never true
2179 # Long conversation - definitely needs compaction
2180 estimated_length = "very long"
2181 needs_attention = True
2182 recommend_compact = True
2184 if conv_stats["total_conversations"] > 20: 2184 ↛ 2186line 2184 didn't jump to line 2186 because the condition on line 2184 was never true
2185 # Extremely long - may need clear after compact
2186 estimated_length = "extremely long"
2187 needs_attention = True
2188 recommend_compact = True
2189 recommend_clear = True
2191 # Override: ALWAYS recommend compaction during checkpoints
2192 # Checkpoints typically happen during long sessions where context is an issue
2193 # This ensures the "Context low" warning gets addressed
2194 recommend_compact = True
2195 needs_attention = True
2196 estimated_length = (
2197 "checkpoint-session" if estimated_length == "moderate" else estimated_length
2198 )
2200 status = "optimal" if not needs_attention else "needs optimization"
2202 return {
2203 "needs_attention": needs_attention,
2204 "status": status,
2205 "estimated_length": estimated_length,
2206 "recommend_compact": recommend_compact,
2207 "recommend_clear": recommend_clear,
2208 "confidence": "heuristic",
2209 }
2211 except Exception as e:
2212 return {
2213 "needs_attention": False,
2214 "status": "analysis_failed",
2215 "estimated_length": "unknown",
2216 "recommend_compact": False,
2217 "recommend_clear": False,
2218 "error": str(e),
2219 }
2222async def analyze_conversation_flow() -> dict[str, Any]:
2223 """Phase 3A: Analyze conversation patterns and flow."""
2224 try:
2225 # Analyze recent reflection patterns to understand session flow
2227 if REFLECTION_TOOLS_AVAILABLE:
2228 try:
2229 db = await get_reflection_database()
2231 # Search recent reflections for patterns
2232 recent_reflections = await db.search_reflections(
2233 "session checkpoint",
2234 limit=5,
2235 )
2237 if recent_reflections:
2238 # Analyze pattern based on recent reflections
2239 if any(
2240 "excellent" in r["content"].lower() for r in recent_reflections
2241 ):
2242 pattern_type = "productive_development"
2243 recommendations = [
2244 "Continue current productive workflow",
2245 "Consider documenting successful patterns",
2246 "Maintain current checkpoint frequency",
2247 ]
2248 elif any(
2249 "attention" in r["content"].lower() for r in recent_reflections
2250 ):
2251 pattern_type = "optimization_needed"
2252 recommendations = [
2253 "Review recent workflow changes",
2254 "Consider more frequent checkpoints",
2255 "Use search tools to find successful patterns",
2256 ]
2257 else:
2258 pattern_type = "steady_progress"
2259 recommendations = [
2260 "Maintain current workflow patterns",
2261 "Consider periodic workflow evaluation",
2262 ]
2263 else:
2264 pattern_type = "new_session"
2265 recommendations = [
2266 "Establish workflow patterns through regular checkpoints",
2267 ]
2269 except Exception:
2270 pattern_type = "analysis_unavailable"
2271 recommendations = [
2272 "Use regular checkpoints to establish workflow patterns",
2273 ]
2274 else:
2275 pattern_type = "basic_session"
2276 recommendations = ["Enable reflection tools for advanced flow analysis"]
2278 return {
2279 "pattern_type": pattern_type,
2280 "recommendations": recommendations,
2281 "confidence": "pattern_based",
2282 }
2284 except Exception as e:
2285 return {
2286 "pattern_type": "analysis_failed",
2287 "recommendations": ["Use basic workflow patterns"],
2288 "error": str(e),
2289 }
2292async def analyze_memory_patterns(db, conv_count: int) -> dict[str, Any]:
2293 """Phase 3A: Advanced memory pattern analysis."""
2294 try:
2295 # Analyze conversation history for intelligent insights
2296 if conv_count == 0:
2297 return {
2298 "summary": "New session - no historical patterns yet",
2299 "proactive_suggestions": [
2300 "Start building conversation history for better insights",
2301 ],
2302 }
2303 if conv_count < 5:
2304 return {
2305 "summary": f"{conv_count} conversations stored - building pattern recognition",
2306 "proactive_suggestions": [
2307 "Continue regular checkpoints to build session intelligence",
2308 "Use store_reflection for important insights",
2309 ],
2310 }
2311 if conv_count < 20:
2312 return {
2313 "summary": f"{conv_count} conversations stored - developing patterns",
2314 "proactive_suggestions": [
2315 "Use reflect_on_past to leverage growing knowledge base",
2316 "Search previous solutions before starting new implementations",
2317 ],
2318 }
2319 return {
2320 "summary": f"{conv_count} conversations - rich pattern recognition available",
2321 "proactive_suggestions": [
2322 "Leverage extensive history with targeted searches",
2323 "Consider workflow optimization based on successful patterns",
2324 "Use conversation history to accelerate problem-solving",
2325 ],
2326 }
2328 except Exception as e:
2329 return {
2330 "summary": "Memory analysis unavailable",
2331 "proactive_suggestions": [
2332 "Use basic memory tools for conversation tracking",
2333 ],
2334 "error": str(e),
2335 }
2338async def analyze_project_workflow_patterns(current_dir: Path) -> dict[str, Any]:
2339 """Phase 3A: Project-specific workflow pattern analysis."""
2340 try:
2341 workflow_recommendations = []
2343 # Detect project characteristics
2344 has_tests = (current_dir / "tests").exists() or (current_dir / "test").exists()
2345 has_git = (current_dir / ".git").exists()
2346 has_python = (current_dir / "pyproject.toml").exists() or (
2347 current_dir / "requirements.txt"
2348 ).exists()
2349 has_node = (current_dir / "package.json").exists()
2350 has_docker = (current_dir / "Dockerfile").exists() or (
2351 current_dir / "docker-compose.yml"
2352 ).exists()
2354 # Generate intelligent workflow recommendations
2355 if has_tests:
2356 workflow_recommendations.append(
2357 "Use targeted test commands for specific test scenarios",
2358 )
2359 workflow_recommendations.append(
2360 "Consider test-driven development workflow with regular testing",
2361 )
2363 if has_git:
2364 workflow_recommendations.append(
2365 "Leverage git context for branch-specific development",
2366 )
2367 workflow_recommendations.append(
2368 "Use commit messages to track progress patterns",
2369 )
2371 if has_python and has_tests:
2372 workflow_recommendations.append(
2373 "Python+Testing: Consider pytest workflows with coverage analysis",
2374 )
2376 if has_node:
2377 workflow_recommendations.append(
2378 "Node.js project: Leverage npm/yarn scripts in development workflow",
2379 )
2381 if has_docker:
2382 workflow_recommendations.append(
2383 "Containerized project: Consider container-based development workflows",
2384 )
2386 # Default recommendations if no specific patterns detected
2387 if not workflow_recommendations:
2388 workflow_recommendations.append(
2389 "Establish project-specific workflow patterns through regular checkpoints",
2390 )
2392 return {
2393 "workflow_recommendations": workflow_recommendations,
2394 "project_characteristics": {
2395 "has_tests": has_tests,
2396 "has_git": has_git,
2397 "has_python": has_python,
2398 "has_node": has_node,
2399 "has_docker": has_docker,
2400 },
2401 }
2403 except Exception as e:
2404 return {
2405 "workflow_recommendations": ["Use basic project workflow patterns"],
2406 "error": str(e),
2407 }
2410def _get_time_based_recommendations(hour: int) -> list[str]:
2411 """Get time-based intelligent recommendations."""
2412 recommendations = []
2414 if 9 <= hour <= 11:
2415 recommendations.append(
2416 "Morning session: Consider high-focus tasks and planning",
2417 )
2418 elif 13 <= hour <= 15:
2419 recommendations.append(
2420 "Afternoon session: Good time for implementation and testing",
2421 )
2422 elif hour >= 18:
2423 recommendations.append(
2424 "Evening session: Consider review and documentation tasks",
2425 )
2427 return recommendations
2430def _extract_quality_scores_from_reflections(reflections: list) -> list[float]:
2431 """Extract quality scores from reflection content for trend analysis."""
2432 scores = []
2434 for reflection in reflections:
2435 if "quality score:" in reflection["content"]:
2436 try:
2437 score_text = (
2438 reflection["content"].split("quality score:")[1].split("/")[0]
2439 )
2440 score = float(score_text.strip())
2441 scores.append(score)
2442 except (ValueError, IndexError, AttributeError):
2443 continue
2445 return scores
2448def _generate_quality_trend_recommendations(scores: list[float]) -> list[str]:
2449 """Generate recommendations based on quality score trends."""
2450 if not scores:
2451 return []
2453 avg_score = sum(scores) / len(scores)
2455 if avg_score > 80:
2456 return ["Excellent session trend: Maintain current productive patterns"]
2457 if avg_score < 60:
2458 return [
2459 "Session quality declining: Review workflow and take corrective actions",
2460 ]
2461 return ["Steady session progress: Consider optimization opportunities"]
2464async def _analyze_reflection_based_intelligence() -> list[str]:
2465 """Analyze recent reflections for intelligence recommendations."""
2466 if not REFLECTION_TOOLS_AVAILABLE:
2467 return []
2469 try:
2470 db = await get_reflection_database()
2471 recent_reflections = await db.search_reflections("checkpoint", limit=3)
2473 if recent_reflections:
2474 recent_scores = _extract_quality_scores_from_reflections(recent_reflections)
2475 return _generate_quality_trend_recommendations(recent_scores)
2477 except Exception:
2478 return ["Enable reflection analysis for session trend intelligence"]
2480 return []
2483def _ensure_default_recommendations(priority_actions: list[str]) -> list[str]:
2484 """Ensure there are default recommendations if none generated."""
2485 if not priority_actions:
2486 return ["Establish session intelligence through regular checkpoint patterns"]
2487 return priority_actions
2490def _get_intelligence_error_result(error: Exception) -> dict[str, Any]:
2491 """Get error result for intelligence generation failure."""
2492 return {
2493 "priority_actions": ["Use basic session management patterns"],
2494 "intelligence_level": "fallback",
2495 "error": str(error),
2496 }
2499async def generate_session_intelligence() -> dict[str, Any]:
2500 """Phase 3A: Generate proactive session intelligence and priority actions."""
2501 try:
2502 current_time = datetime.now()
2504 # Gather all recommendation sources
2505 priority_actions = []
2506 priority_actions.extend(_get_time_based_recommendations(current_time.hour))
2507 priority_actions.extend(await _analyze_reflection_based_intelligence())
2508 priority_actions = _ensure_default_recommendations(priority_actions)
2510 return {
2511 "priority_actions": priority_actions,
2512 "intelligence_level": "proactive",
2513 "timestamp": current_time.isoformat(),
2514 }
2516 except Exception as e:
2517 return _get_intelligence_error_result(e)
2520async def _analyze_token_usage_recommendations(results: list[str]) -> None:
2521 """Analyze token usage and add recommendations."""
2522 token_analysis = await analyze_token_usage_patterns()
2523 if token_analysis["needs_attention"]:
2524 results.append(f"⚠️ Context usage: {token_analysis['status']}")
2525 results.append(
2526 f" Estimated conversation length: {token_analysis['estimated_length']}",
2527 )
2529 # Smart compaction triggers - PRIORITY RECOMMENDATIONS
2530 if token_analysis["recommend_compact"]:
2531 results.append(
2532 "🚨 CRITICAL AUTO-RECOMMENDATION: Context compaction required",
2533 )
2534 results.append(
2535 "🔄 This checkpoint has prepared conversation summary for compaction",
2536 )
2537 results.append(
2538 "💡 Compaction should be applied automatically after this checkpoint",
2539 )
2541 if token_analysis["recommend_clear"]:
2542 results.append(
2543 "🆕 AUTO-RECOMMENDATION: Consider /clear for fresh context after compaction",
2544 )
2545 else:
2546 results.append(f"✅ Context usage: {token_analysis['status']}")
2549async def _analyze_conversation_flow_recommendations(results: list[str]) -> None:
2550 """Analyze conversation flow and add recommendations."""
2551 flow_analysis = await analyze_conversation_flow()
2552 results.append(f"📊 Session flow: {flow_analysis['pattern_type']}")
2554 if flow_analysis["recommendations"]:
2555 results.append("🎯 Flow-based recommendations:")
2556 for rec in flow_analysis["recommendations"][:3]:
2557 results.append(f" • {rec}")
2560async def _analyze_memory_recommendations(results: list[str]) -> None:
2561 """Analyze memory patterns and add recommendations."""
2562 if REFLECTION_TOOLS_AVAILABLE:
2563 try:
2564 db = await get_reflection_database()
2565 stats = await db.get_stats()
2566 conv_count = stats.get("conversations_count", 0)
2568 # Advanced memory analysis
2569 memory_insights = await analyze_memory_patterns(db, conv_count)
2570 results.append(f"📚 Memory insights: {memory_insights['summary']}")
2572 if memory_insights["proactive_suggestions"]:
2573 results.append("💡 Proactive suggestions:")
2574 for suggestion in memory_insights["proactive_suggestions"][:2]:
2575 results.append(f" • {suggestion}")
2577 except Exception:
2578 results.append("📚 Memory system available for conversation search")
2581async def _analyze_project_workflow_recommendations(results: list[str]) -> None:
2582 """Analyze project workflow patterns and add recommendations."""
2583 current_dir = Path(os.environ.get("PWD", Path.cwd()))
2584 project_insights = await analyze_project_workflow_patterns(current_dir)
2586 if project_insights["workflow_recommendations"]:
2587 results.append("🚀 Workflow optimizations:")
2588 for opt in project_insights["workflow_recommendations"][:2]:
2589 results.append(f" • {opt}")
2592async def _analyze_session_intelligence_recommendations(results: list[str]) -> None:
2593 """Analyze session intelligence and add recommendations."""
2594 session_intelligence = await generate_session_intelligence()
2595 if session_intelligence["priority_actions"]:
2596 results.append("\n🧠 Session Intelligence:")
2597 for action in session_intelligence["priority_actions"][:3]:
2598 results.append(f" • {action}")
2601async def _analyze_quality_monitoring_recommendations(results: list[str]) -> None:
2602 """Analyze quality monitoring and add recommendations."""
2603 quality_monitoring = await monitor_proactive_quality()
2604 if quality_monitoring["monitoring_active"]:
2605 results.append(f"\n📊 Quality Trend: {quality_monitoring['quality_trend']}")
2607 if quality_monitoring["alerts"]:
2608 results.append("⚠️ Quality Alerts:")
2609 for alert in quality_monitoring["alerts"][:2]:
2610 results.append(f" • {alert}")
2612 if quality_monitoring["recommend_checkpoint"]:
2613 results.append("🔄 PROACTIVE RECOMMENDATION: Consider immediate checkpoint")
2616async def _add_fallback_recommendations(results: list[str], error: Exception) -> None:
2617 """Add fallback recommendations when analysis fails."""
2618 results.append(f"❌ Advanced context analysis failed: {str(error)[:60]}...")
2619 results.append("💡 Falling back to basic context management recommendations")
2621 # Fallback to basic recommendations
2622 results.append("🎯 Basic context actions:")
2623 results.append(" • Use /compact for conversation summarization")
2624 results.append(" • Use /clear for fresh context on new topics")
2625 results.append(" • Use search tools to retrieve relevant discussions")
2628async def analyze_context_usage() -> list[str]:
2629 """Phase 2 & 3A: Advanced context analysis with intelligent recommendations."""
2630 results = []
2632 try:
2633 results.append("🔍 Advanced context analysis and optimization...")
2635 # Phase 3A: Advanced Context Intelligence
2636 await analyze_advanced_context_metrics()
2638 # Run all analysis components
2639 await _analyze_token_usage_recommendations(results)
2640 await _analyze_conversation_flow_recommendations(results)
2641 await _analyze_memory_recommendations(results)
2642 await _analyze_project_workflow_recommendations(results)
2643 await _analyze_session_intelligence_recommendations(results)
2644 await _analyze_quality_monitoring_recommendations(results)
2646 except Exception as e:
2647 await _add_fallback_recommendations(results, e)
2649 return results
2652async def _perform_quality_assessment() -> tuple[int, dict]:
2653 """Perform quality assessment and return score and data."""
2654 quality_data = await calculate_quality_score()
2655 quality_score = quality_data["total_score"]
2656 return quality_score, quality_data
2659async def _format_quality_results(
2660 quality_score: int,
2661 quality_data: dict,
2662 checkpoint_result: dict | None = None,
2663) -> list[str]:
2664 """Format quality assessment results for display."""
2665 output = []
2667 # Quality status
2668 if quality_score >= 80:
2669 output.append(f"✅ Session quality: EXCELLENT (Score: {quality_score}/100)")
2670 elif quality_score >= 60:
2671 output.append(f"✅ Session quality: GOOD (Score: {quality_score}/100)")
2672 else:
2673 output.append(
2674 f"⚠️ Session quality: NEEDS ATTENTION (Score: {quality_score}/100)",
2675 )
2677 # Quality breakdown
2678 output.append("\n📈 Quality breakdown:")
2679 breakdown = quality_data["breakdown"]
2680 output.append(f" • Project health: {breakdown['project_health']:.1f}/40")
2681 output.append(f" • Permissions: {breakdown['permissions']:.1f}/20")
2682 output.append(f" • Session tools: {breakdown['session_management']:.1f}/20")
2683 output.append(f" • Tool availability: {breakdown['tools']:.1f}/20")
2685 # Recommendations
2686 recommendations = quality_data["recommendations"]
2687 if recommendations:
2688 output.append("\n💡 Recommendations:")
2689 for rec in recommendations[:3]:
2690 output.append(f" • {rec}")
2692 # Session management specific results
2693 if checkpoint_result:
2694 strengths = checkpoint_result.get("strengths", [])
2695 if strengths:
2696 output.append("\n🌟 Session strengths:")
2697 for strength in strengths[:3]:
2698 output.append(f" • {strength}")
2700 session_stats = checkpoint_result.get("session_stats", {})
2701 if session_stats:
2702 output.append("\n⏱️ Session progress:")
2703 output.append(
2704 f" • Duration: {session_stats.get('duration_minutes', 0)} minutes",
2705 )
2706 output.append(
2707 f" • Checkpoints: {session_stats.get('total_checkpoints', 0)}",
2708 )
2709 output.append(
2710 f" • Success rate: {session_stats.get('success_rate', 0):.1f}%",
2711 )
2713 return output
2716async def _perform_git_checkpoint(current_dir: Path, quality_score: int) -> list[str]:
2717 """Handle git operations for checkpoint commit."""
2718 output = []
2719 output.append("\n" + "=" * 50)
2720 output.append("📦 Git Checkpoint Commit")
2721 output.append("=" * 50)
2723 git_dir = current_dir / ".git"
2724 if not (git_dir.exists() and git_dir.is_dir()):
2725 output.append("\nℹ️ Not a git repository - skipping commit")
2726 return output
2728 try:
2729 # Check git status
2730 status_result = subprocess.run(
2731 ["git", "status", "--porcelain"],
2732 check=False,
2733 capture_output=True,
2734 text=True,
2735 cwd=current_dir,
2736 )
2738 if status_result.returncode != 0:
2739 output.append(f"\n⚠️ Git status check failed: {status_result.stderr}")
2740 return output
2742 status_lines = (
2743 status_result.stdout.strip().split("\n")
2744 if status_result.stdout.strip()
2745 else []
2746 )
2747 modified_files, untracked_files = _parse_git_status(status_lines)
2749 if not modified_files and not untracked_files:
2750 output.append("\n✅ Working directory is clean - no changes to commit")
2751 return output
2753 output.append(
2754 f"\n📝 Found {len(modified_files)} modified files and {len(untracked_files)} untracked files",
2755 )
2757 # Handle untracked files
2758 if untracked_files:
2759 output.extend(_format_untracked_files(untracked_files))
2761 # Stage and commit modified files
2762 if modified_files:
2763 output.extend(
2764 _stage_and_commit_files(current_dir, modified_files, quality_score),
2765 )
2766 elif untracked_files:
2767 output.append("\nℹ️ No staged changes to commit")
2768 output.append(
2769 " 💡 Add untracked files with 'git add' if you want to include them",
2770 )
2772 except Exception as e:
2773 output.append(f"\n⚠️ Git operations error: {e}")
2775 return output
2778def _parse_git_status(status_lines: list[str]) -> tuple[list[str], list[str]]:
2779 """Parse git status output into modified and untracked files."""
2780 modified_files = []
2781 untracked_files = []
2783 for line in status_lines:
2784 if line:
2785 status = line[:2]
2786 filepath = line[3:]
2787 if status == "??":
2788 untracked_files.append(filepath)
2789 elif status.strip():
2790 modified_files.append(filepath)
2792 return modified_files, untracked_files
2795def _format_untracked_files(untracked_files: list[str]) -> list[str]:
2796 """Format untracked files display."""
2797 output = []
2798 output.append("\n🆕 Untracked files found:")
2800 for file in untracked_files[:10]: # Limit to first 10 for display
2801 output.append(f" • {file}")
2803 if len(untracked_files) > 10:
2804 output.append(f" ... and {len(untracked_files) - 10} more")
2806 output.append("\n⚠️ Please manually review and add untracked files if needed:")
2807 output.append(" Use: git add <file> for files you want to include")
2809 return output
2812def _stage_and_commit_files(
2813 current_dir: Path,
2814 modified_files: list[str],
2815 quality_score: int,
2816) -> list[str]:
2817 """Stage modified files and create commit."""
2818 output = []
2819 output.append(f"\n✅ Staging {len(modified_files)} modified files...")
2821 # Stage files
2822 for file in modified_files:
2823 subprocess.run(
2824 ["git", "add", file], check=False, cwd=current_dir, capture_output=True
2825 )
2827 # Check if there's anything to commit
2828 staged_result = subprocess.run(
2829 ["git", "diff", "--cached", "--name-only"],
2830 check=False,
2831 capture_output=True,
2832 text=True,
2833 cwd=current_dir,
2834 )
2836 if not staged_result.stdout.strip():
2837 output.append("\nℹ️ No staged changes to commit")
2838 return output
2840 # Create commit
2841 timestamp = datetime.now().strftime("%Y-%m-%d %H:%M:%S")
2842 commit_message = f"checkpoint: Session checkpoint - {timestamp}\n\nAutomatic checkpoint commit via session-management MCP server\nProject: {project_name}\nQuality Score: {quality_score}/100"
2844 commit_result = subprocess.run(
2845 ["git", "commit", "-m", commit_message],
2846 check=False,
2847 capture_output=True,
2848 text=True,
2849 cwd=current_dir,
2850 )
2852 if commit_result.returncode == 0:
2853 hash_result = subprocess.run(
2854 ["git", "rev-parse", "HEAD"],
2855 check=False,
2856 capture_output=True,
2857 text=True,
2858 cwd=current_dir,
2859 )
2860 commit_hash = (
2861 hash_result.stdout.strip()[:8] if hash_result.returncode == 0 else "unknown"
2862 )
2864 output.append(f"\n✅ Checkpoint commit created: {commit_hash}")
2865 output.append(f" Message: checkpoint: Session checkpoint - {timestamp}")
2866 output.append(" 💡 Use 'git reset HEAD~1' to undo if needed")
2867 else:
2868 output.append(f"\n⚠️ Commit failed: {commit_result.stderr}")
2870 return output
2873@mcp.tool()
2874async def checkpoint(working_directory: str | None = None) -> str:
2875 """Perform mid-session quality checkpoint with workflow analysis and optimization recommendations.
2877 Args:
2878 working_directory: Directory to perform checkpoint analysis in (defaults to current directory)
2880 """
2881 # Handle None parameter like other MCP functions
2882 if working_directory is None:
2883 working_directory = "."
2885 # Determine project name from working directory
2886 project_name = (
2887 Path(working_directory).name
2888 if working_directory != "."
2889 else current_project or "Current Project"
2890 )
2892 output = []
2893 output.append(
2894 f"🔍 Claude Session Checkpoint - {project_name}",
2895 )
2896 output.append("=" * 50)
2898 # Quality Assessment
2899 try:
2900 quality_score, quality_data = await _perform_quality_assessment()
2902 if SESSION_MANAGEMENT_AVAILABLE:
2903 output.append("\n📊 Running comprehensive quality assessment...")
2904 checkpoint_result = checkpoint_session()
2906 if checkpoint_result.get("checkpoint_passed", True):
2907 output.extend(
2908 await _format_quality_results(
2909 quality_score,
2910 quality_data,
2911 checkpoint_result,
2912 ),
2913 )
2914 else:
2915 output.append("⚠️ Session quality issues detected:")
2916 for issue in checkpoint_result.get("issues_found", []):
2917 output.append(f" • {issue}")
2918 else:
2919 output.append(
2920 "\n⚠️ Session management not available - performing basic checks",
2921 )
2922 output.extend(await _format_quality_results(quality_score, quality_data))
2924 except Exception as e:
2925 session_logger.exception(
2926 "Checkpoint error occurred",
2927 error=str(e),
2928 project=current_project,
2929 )
2930 output.append(f"❌ Checkpoint error: {e}")
2931 quality_score, quality_data = await _perform_quality_assessment()
2932 output.append(f"\n📊 Basic quality assessment: {quality_score}/100")
2934 # Project Context
2935 current_dir = Path(os.environ.get("PWD", Path.cwd()))
2936 project_context = await analyze_project_context(current_dir)
2937 context_score = sum(1 for detected in project_context.values() if detected)
2938 output.append(
2939 f"\n🎯 Project context: {context_score}/{len(project_context)} indicators",
2940 )
2942 # Dynamic recommendations
2943 if quality_score >= 80:
2944 output.append("\n💡 Excellent session - maintain current workflow")
2945 elif quality_score >= 60:
2946 output.append("\n💡 Good session - minor optimizations available")
2947 else:
2948 output.append("\n💡 Session needs attention - review recommendations above")
2950 output.append("🔄 Consider next checkpoint in 30-45 minutes")
2952 # Git operations
2953 output.extend(await _perform_git_checkpoint(current_dir, quality_score))
2955 # Session insights
2956 output.append("\n" + "=" * 50)
2957 output.append("🧠 Automatic Session Insights Capture")
2958 output.append("=" * 50)
2960 insights_results = await capture_session_insights(quality_score)
2961 output.extend(insights_results)
2963 # Context analysis
2964 output.append("\n" + "=" * 50)
2965 output.append("🔄 Context Management Analysis")
2966 output.append("=" * 50)
2968 context_results = await analyze_context_usage()
2969 output.extend(context_results)
2971 # Strategic compaction
2972 output.append("\n" + "=" * 50)
2973 output.append("📦 Strategic Compaction & Optimization")
2974 output.append("=" * 50)
2976 compaction_results = await perform_strategic_compaction()
2977 output.extend(compaction_results)
2979 # Auto-compaction
2980 output.append("\n" + "=" * 50)
2981 output.append("🔄 Automatic Context Compaction")
2982 output.append("=" * 50)
2984 try:
2985 auto_compact_result = await auto_compact()
2986 compact_lines = auto_compact_result.split("\n")
2988 for line in compact_lines:
2989 if any(
2990 keyword in line.lower()
2991 for keyword in [
2992 "preserved",
2993 "stored",
2994 "compaction required",
2995 "/compact",
2996 ]
2997 ):
2998 output.append(line)
3000 output.append("✅ Auto-compaction integrated into checkpoint workflow")
3001 except Exception as e:
3002 output.append(f"⚠️ Auto-compaction integration failed: {e}")
3003 output.append("💡 Manual /compact may be needed")
3005 output.append(
3006 f"\n✨ Enhanced checkpoint complete - {project_name} session optimized!",
3007 )
3008 output.append("🔄 Context compaction has been automatically triggered")
3010 return "\n".join(output)
3013@mcp.tool()
3014async def end() -> str:
3015 """End Claude session with cleanup, learning capture, and handoff file creation."""
3016 output = []
3017 output.append(f"🏁 Claude Session End - {current_project or 'Current Project'}")
3018 output.append("=" * 60)
3020 # Final Checkpoint
3021 if SESSION_MANAGEMENT_AVAILABLE:
3022 try:
3023 output.append("\n📊 Final session quality checkpoint...")
3024 final_checkpoint = checkpoint_session()
3026 # Calculate final quality score
3027 final_quality_data = await calculate_quality_score()
3028 final_quality_score = final_quality_data["total_score"]
3030 if final_checkpoint.get("checkpoint_passed"):
3031 if final_quality_score >= 80:
3032 output.append("✅ Final session quality: EXCELLENT")
3033 elif final_quality_score >= 60:
3034 output.append("✅ Final session quality: GOOD")
3035 else:
3036 output.append("⚠️ Final session quality: NEEDS ATTENTION")
3037 output.append(f" 📈 Quality score: {final_quality_score}/100")
3038 else:
3039 output.append("⚠️ Final session quality issues detected")
3041 # Create handoff file
3042 handoff_dir = claude_dir / "sessions"
3043 handoff_dir.mkdir(exist_ok=True)
3045 session_timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
3046 project_name = (
3047 current_project.replace("/", "_").replace(" ", "_")
3048 if current_project
3049 else "unknown"
3050 )
3051 handoff_file = (
3052 handoff_dir
3053 / f"{project_name}_mcp_session_handoff_{session_timestamp}.md"
3054 )
3056 handoff_content = f"""# {current_project} MCP Session Handoff - {datetime.now().strftime("%Y-%m-%d %H:%M:%S")}
3058## Session Summary
3059- **Project**: {current_project}
3060- **Working Directory**: {Path(os.environ.get("PWD", Path.cwd()))}
3061- **MCP Server**: Claude Session Management
3062- **Quality Score**: {final_quality_score}/100
3064## Session Tools Used
3065- init: ✅ Complete setup with UV sync
3066- checkpoint: Used for quality monitoring
3067- end: ✅ Comprehensive cleanup
3069## Next Session Preparation
3070- MCP server automatically available via .mcp.json
3071- Use /session-management:init slash command for startup
3072- Session continuity maintained through MCP integration
3074## Project Context
3075- Session managed via MCP server integration
3076- All session management tools available as slash commands
3077- Consistent workflow across all projects
3078"""
3080 with open(handoff_file, "w") as f:
3081 f.write(handoff_content)
3083 output.append(f"📋 Session handoff created: {handoff_file}")
3085 # End session
3086 end_result = end_session()
3087 if end_result.get("session_ended_successfully"):
3088 output.append("✅ Session terminated successfully")
3090 except Exception as e:
3091 output.append(f"❌ Session end error: {e}")
3092 else:
3093 output.append("\n✅ Basic session cleanup completed")
3095 output.append(
3096 f"\n🙏 {current_project.upper() if current_project else 'SESSION'} COMPLETE!",
3097 )
3098 output.append(f"📅 Ended: {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}")
3099 output.append("\n🔄 Next session: Use /session-management:init slash command")
3100 output.append("🎉 MCP server will provide automatic session management!")
3102 return "\n".join(output)
3105async def health_check() -> dict[str, Any]:
3106 """Comprehensive health check for MCP server and toolkit availability."""
3107 health_status = {
3108 "overall_healthy": True,
3109 "checks": {},
3110 "warnings": [],
3111 "errors": [],
3112 }
3114 # MCP Server health
3115 try:
3116 # Test FastMCP availability
3117 health_status["checks"]["mcp_server"] = "✅ Active"
3118 except Exception as e:
3119 health_status["checks"]["mcp_server"] = "❌ Error"
3120 health_status["errors"].append(f"MCP server issue: {e}")
3121 health_status["overall_healthy"] = False
3123 # Session management toolkit health
3124 health_status["checks"]["session_toolkit"] = (
3125 "✅ Available" if SESSION_MANAGEMENT_AVAILABLE else "⚠️ Limited"
3126 )
3127 if not SESSION_MANAGEMENT_AVAILABLE:
3128 health_status["warnings"].append(
3129 "Session management toolkit not fully available",
3130 )
3132 # UV package manager health
3133 uv_available = shutil.which("uv") is not None
3134 health_status["checks"]["uv_manager"] = (
3135 "✅ Available" if uv_available else "❌ Missing"
3136 )
3137 if not uv_available:
3138 health_status["warnings"].append("UV package manager not found")
3140 # Claude directory health
3141 validate_claude_directory()
3142 health_status["checks"]["claude_directory"] = "✅ Valid"
3144 # Permissions system health
3145 try:
3146 permissions_status = permissions_manager.get_permission_status()
3147 health_status["checks"]["permissions_system"] = "✅ Active"
3148 health_status["checks"]["session_id"] = (
3149 f"Active ({permissions_status['session_id']})"
3150 )
3151 except Exception as e:
3152 health_status["checks"]["permissions_system"] = "❌ Error"
3153 health_status["errors"].append(f"Permissions system issue: {e}")
3154 health_status["overall_healthy"] = False
3156 # Crackerjack integration health
3157 health_status["checks"]["crackerjack_integration"] = (
3158 "✅ Available" if CRACKERJACK_INTEGRATION_AVAILABLE else "⚠️ Not Available"
3159 )
3160 if not CRACKERJACK_INTEGRATION_AVAILABLE:
3161 health_status["warnings"].append(
3162 "Crackerjack integration not available - quality monitoring disabled",
3163 )
3165 # Log health check results
3166 session_logger.info(
3167 "Health check completed",
3168 overall_healthy=health_status["overall_healthy"],
3169 warnings_count=len(health_status["warnings"]),
3170 errors_count=len(health_status["errors"]),
3171 )
3173 return health_status
3176async def _add_basic_status_info(output: list[str], current_dir: Path) -> None:
3177 """Add basic status information to output."""
3178 global current_project
3179 current_project = current_dir.name
3181 output.append(f"📁 Current project: {current_project}")
3182 output.append(f"🗂️ Working directory: {current_dir}")
3183 output.append("🌐 MCP server: Active (Claude Session Management)")
3186async def _add_health_status_info(output: list[str]) -> None:
3187 """Add health check information to output."""
3188 health_status = await health_check()
3190 output.append(
3191 f"\n🏥 System Health: {'✅ HEALTHY' if health_status['overall_healthy'] else '⚠️ ISSUES DETECTED'}",
3192 )
3194 # Display health check results
3195 for check_name, status in health_status["checks"].items():
3196 friendly_name = check_name.replace("_", " ").title()
3197 output.append(f" • {friendly_name}: {status}")
3199 # Show warnings and errors
3200 if health_status["warnings"]:
3201 output.append("\n⚠️ Health Warnings:")
3202 for warning in health_status["warnings"][:3]: # Limit to 3 warnings
3203 output.append(f" • {warning}")
3205 if health_status["errors"]:
3206 output.append("\n❌ Health Errors:")
3207 for error in health_status["errors"][:3]: # Limit to 3 errors
3208 output.append(f" • {error}")
3211async def _add_project_context_info(output: list[str], current_dir: Path) -> None:
3212 """Add project context information to output."""
3213 from .utils.git_operations import get_worktree_info, list_worktrees
3215 project_context = await analyze_project_context(current_dir)
3216 context_score = sum(1 for detected in project_context.values() if detected)
3217 output.append(f"\n📈 Project maturity: {context_score}/{len(project_context)}")
3219 # Add worktree information
3220 try:
3221 worktree_info = get_worktree_info(current_dir)
3222 if worktree_info:
3223 all_worktrees = list_worktrees(current_dir)
3225 output.append("\n🌿 Git Worktree Information:")
3226 if worktree_info.is_main_worktree:
3227 output.append(
3228 f" 📍 Current: Main repository on '{worktree_info.branch}'",
3229 )
3230 else:
3231 output.append(f" 📍 Current: Worktree on '{worktree_info.branch}'")
3232 output.append(f" 📁 Path: {worktree_info.path}")
3234 if len(all_worktrees) > 1:
3235 output.append(f" 🌳 Total worktrees: {len(all_worktrees)}")
3236 other_branches = [
3237 wt.branch for wt in all_worktrees if wt.path != worktree_info.path
3238 ]
3239 if other_branches:
3240 output.append(
3241 f" 🔀 Other branches: {', '.join(other_branches[:3])}",
3242 )
3243 if len(other_branches) > 3:
3244 output.append(f" ... and {len(other_branches) - 3} more")
3245 output.append(" 💡 Use 'git_worktree_list' to see all worktrees")
3246 else:
3247 output.append(
3248 " 💡 Use 'git_worktree_add <branch> <path>' to create parallel worktrees",
3249 )
3251 if worktree_info.is_detached:
3252 output.append(" ⚠️ Detached HEAD - consider checking out a branch")
3254 except Exception:
3255 # Silently handle worktree detection failures
3256 pass
3259def _add_permissions_info(output: list[str]) -> None:
3260 """Add permissions information to output."""
3261 permissions_status = permissions_manager.get_permission_status()
3262 output.append("\n🔐 Session Permissions:")
3263 output.append(
3264 f" 📊 Trusted operations: {permissions_status['trusted_operations_count']}",
3265 )
3266 if permissions_status["trusted_operations"]:
3267 for op in permissions_status["trusted_operations"]:
3268 output.append(f" ✅ {op.replace('_', ' ').title()}")
3269 else:
3270 output.append(" ⚠️ No trusted operations yet - will prompt for permissions")
3273def _add_basic_tools_info(output: list[str]) -> None:
3274 """Add basic MCP tools information to output."""
3275 output.append("\n🛠️ Available MCP Tools:")
3276 output.append("• init - Full session initialization")
3277 output.append("• checkpoint - Quality monitoring")
3278 output.append("• end - Complete cleanup")
3279 output.append("• status - This status report with health checks")
3280 output.append("• permissions - Manage trusted operations")
3281 output.append("• git_worktree_list - List all git worktrees")
3282 output.append("• git_worktree_add - Create new worktrees")
3283 output.append("• git_worktree_remove - Remove worktrees")
3284 output.append("• git_worktree_status - Comprehensive worktree status")
3285 output.append("• git_worktree_prune - Clean up stale references")
3288def _add_feature_status_info(output: list[str]) -> None:
3289 """Add feature status information to output."""
3290 # Token Optimization Status
3291 if TOKEN_OPTIMIZER_AVAILABLE:
3292 output.append("\n⚡ Token Optimization:")
3293 output.append("• get_cached_chunk - Retrieve chunked response data")
3294 output.append("• get_token_usage_stats - Token usage and savings metrics")
3295 output.append("• optimize_memory_usage - Consolidate old conversations")
3296 output.append("• Built-in response chunking and truncation")
3298 # Multi-Project Coordination Status
3299 if MULTI_PROJECT_AVAILABLE:
3300 output.append("\n🔗 Multi-Project Coordination:")
3301 output.append("• create_project_group - Create project groups for coordination")
3302 output.append("• add_project_dependency - Define project relationships")
3303 output.append(
3304 "• search_across_projects - Search conversations across related projects",
3305 )
3306 output.append("• get_project_insights - Cross-project activity analysis")
3308 # Advanced Search Status
3309 if ADVANCED_SEARCH_AVAILABLE:
3310 output.append("\n🔍 Advanced Search:")
3311 output.append("• advanced_search - Faceted search with filtering")
3312 output.append("• search_suggestions - Auto-completion suggestions")
3313 output.append("• get_search_metrics - Search activity analytics")
3314 output.append("• Built-in full-text indexing and highlighting")
3317def _add_configuration_info(output: list[str]) -> None:
3318 """Add configuration information to output."""
3319 if CONFIG_AVAILABLE:
3320 output.append("\n⚙️ Configuration:")
3321 output.append("• pyproject.toml configuration support")
3322 output.append("• Environment variable overrides")
3323 output.append("• Configurable database, search, and optimization settings")
3325 # Show current optimization stats if available
3326 try:
3327 from .token_optimizer import get_token_optimizer
3329 optimizer = get_token_optimizer()
3330 usage_stats = optimizer.get_usage_stats(hours=24)
3332 if usage_stats["status"] == "success" and usage_stats["total_requests"] > 0:
3333 savings = usage_stats.get("estimated_cost_savings", {})
3334 if savings.get("savings_usd", 0) > 0:
3335 output.append(
3336 f"• Last 24h savings: ${savings['savings_usd']:.4f} USD, {savings['estimated_tokens_saved']:,} tokens",
3337 )
3339 cache_size = len(optimizer.chunk_cache)
3340 if cache_size > 0:
3341 output.append(f"• Active cached chunks: {cache_size}")
3343 except Exception:
3344 pass # Don't fail status if optimization stats fail
3345 else:
3346 output.append("\n❌ Token optimization not available (install tiktoken)")
3349def _add_crackerjack_integration_info(output: list[str]) -> None:
3350 """Add Crackerjack integration information to output."""
3351 if CRACKERJACK_INTEGRATION_AVAILABLE:
3352 output.append("\n🔧 Crackerjack Integration (Enhanced):")
3353 output.append("\n🎯 RECOMMENDED COMMANDS (Enhanced with Memory & Analytics):")
3354 output.append(
3355 "• /session-mgmt:crackerjack-run <command> - Smart execution with insights",
3356 )
3357 output.append("• /session-mgmt:crackerjack-history - View trends and patterns")
3358 output.append("• /session-mgmt:crackerjack-metrics - Quality metrics over time")
3359 output.append("• /session-mgmt:crackerjack-patterns - Test failure analysis")
3360 output.append("• /session-mgmt:crackerjack-help - Complete command guide")
3362 # Detect if basic crackerjack is also available
3363 detected_servers = _detect_other_mcp_servers()
3364 if detected_servers.get("crackerjack", False):
3365 output.append("\n📋 Basic Commands (Raw Output Only):")
3366 output.append(
3367 "• /crackerjack:run <command> - Simple execution without memory",
3368 )
3369 output.append(
3370 "💡 Use enhanced commands above for better development experience",
3371 )
3373 output.append("\n🧠 Enhanced Features:")
3374 output.append("• Automatic conversation memory integration")
3375 output.append("• Quality metrics tracking and trends")
3376 output.append("• Intelligent insights and recommendations")
3377 output.append("• Test failure pattern detection")
3378 else:
3379 output.append("\n⚠️ Crackerjack Integration: Not available")
3382@mcp.tool()
3383async def status(working_directory: str | None = None) -> str:
3384 """Get current session status and project context information with health checks.
3386 Args:
3387 working_directory: Optional working directory override (defaults to PWD environment variable or current directory)
3389 """
3390 output = []
3391 output.append("📊 Claude Session Status via MCP Server")
3392 output.append("=" * 40)
3394 if working_directory:
3395 current_dir = Path(working_directory)
3396 else:
3397 current_dir = Path(os.environ.get("PWD", Path.cwd()))
3399 await _add_basic_status_info(output, current_dir)
3400 await _add_health_status_info(output)
3401 await _add_project_context_info(output, current_dir)
3402 _add_permissions_info(output)
3403 _add_basic_tools_info(output)
3404 _add_feature_status_info(output)
3405 _add_configuration_info(output)
3406 _add_crackerjack_integration_info(output)
3408 return "\n".join(output)
3411@mcp.tool()
3412async def permissions(action: str = "status", operation: str | None = None) -> str:
3413 """Manage session permissions for trusted operations to avoid repeated prompts.
3415 Args:
3416 action: Action to perform: status (show current), trust (add operation), revoke_all (reset)
3417 operation: Operation to trust (required for 'trust' action)
3419 """
3420 output = []
3421 output.append("🔐 Claude Session Permissions Management")
3422 output.append("=" * 50)
3424 if action == "status":
3425 permissions_status = permissions_manager.get_permission_status()
3426 output.append(f"\n📊 Session ID: {permissions_status['session_id']}")
3427 output.append(f"📁 Permissions file: {permissions_status['permissions_file']}")
3428 output.append(
3429 f"✅ Trusted operations: {permissions_status['trusted_operations_count']}",
3430 )
3432 if permissions_status["trusted_operations"]:
3433 output.append("\n🔓 Currently trusted operations:")
3434 for op in permissions_status["trusted_operations"]:
3435 friendly_name = op.replace("_", " ").title()
3436 output.append(f" • {friendly_name}")
3438 output.append(
3439 "\n💡 These operations will not prompt for permission in future sessions",
3440 )
3441 else:
3442 output.append("\n⚠️ No operations are currently trusted")
3443 output.append(
3444 "💡 Operations will be automatically trusted on first successful use",
3445 )
3447 output.append("\n🛠️ Common Operations That Can Be Trusted:")
3448 output.append(" • UV Package Management - uv sync, pip operations")
3449 output.append(" • Git Repository Access - git status, commit, push")
3450 output.append(" • Project File Access - reading/writing project files")
3451 output.append(" • Subprocess Execution - running build tools, tests")
3452 output.append(" • Claude Directory Access - accessing ~/.claude/")
3454 elif action == "trust":
3455 if not operation:
3456 output.append("❌ Error: 'operation' parameter required for 'trust' action")
3457 output.append(
3458 "💡 Example: permissions with action='trust' and operation='uv_package_management'",
3459 )
3460 else:
3461 permissions_manager.trust_operation(
3462 operation,
3463 f"Manually trusted via MCP at {datetime.now()}",
3464 )
3465 output.append(
3466 f"✅ Operation trusted: {operation.replace('_', ' ').title()}",
3467 )
3468 output.append("🔓 This operation will no longer prompt for permission")
3470 elif action == "revoke_all":
3471 permissions_manager.revoke_all_permissions()
3472 output.append("🚨 All trusted permissions have been revoked")
3473 output.append("⚠️ All operations will now prompt for permission again")
3474 output.append(
3475 "💡 Use this for security reset or if permissions were granted incorrectly",
3476 )
3478 else:
3479 output.append(f"❌ Unknown action: {action}")
3480 output.append("💡 Valid actions: 'status', 'trust', 'revoke_all'")
3482 return "\n".join(output)
3485# Token optimization imports
3486try:
3487 from .token_optimizer import (
3488 get_token_optimizer,
3489 optimize_search_response,
3490 track_token_usage,
3491 )
3493 TOKEN_OPTIMIZER_AVAILABLE = True
3494except ImportError:
3495 TOKEN_OPTIMIZER_AVAILABLE = False
3498# Reflection Tools
3499@mcp.tool()
3500async def _optimize_search_results(
3501 results: list,
3502 optimize_tokens: bool,
3503 max_tokens: int,
3504 query: str,
3505) -> tuple[list, dict]:
3506 """Apply token optimization to search results if available."""
3507 optimization_info = {}
3509 if optimize_tokens and TOKEN_OPTIMIZER_AVAILABLE:
3510 try:
3511 results, optimization_info = await optimize_search_response(
3512 results,
3513 strategy="prioritize_recent",
3514 max_tokens=max_tokens,
3515 )
3517 # Track usage
3518 response_text = f"Found {len(results)} conversations"
3519 await track_token_usage(
3520 operation="reflect_on_past",
3521 request_tokens=get_token_optimizer().count_tokens(query),
3522 response_tokens=get_token_optimizer().count_tokens(response_text),
3523 optimization_applied=optimization_info.get("strategy"),
3524 )
3525 except Exception as e:
3526 session_logger.warning(f"Token optimization failed: {e}")
3528 return results, optimization_info
3531def _build_search_header(
3532 results: list,
3533 query: str,
3534 current_proj: str | None,
3535 optimization_info: dict,
3536) -> list[str]:
3537 """Build the header section of search results."""
3538 output = []
3539 output.append(f"🧠 Found {len(results)} relevant conversations for: '{query}'")
3540 if current_proj:
3541 output.append(f"📁 Project: {current_proj}")
3543 # Show optimization info if applied
3544 if optimization_info and optimization_info.get("strategy") != "none":
3545 savings = optimization_info.get("token_savings", {})
3546 if savings.get("tokens_saved", 0) > 0:
3547 output.append(
3548 f"⚡ Token optimization: {savings.get('savings_percentage', 0)}% saved",
3549 )
3551 output.append("=" * 50)
3552 return output
3555def _format_search_results(results: list) -> list[str]:
3556 """Format search results for display."""
3557 output = []
3559 for i, result in enumerate(results, 1):
3560 score_pct = result["score"] * 100
3561 timestamp = result.get("timestamp", "Unknown time")
3562 output.append(f"\n#{i} (Score: {score_pct:.1f}%)")
3563 output.append(f"📅 {timestamp}")
3564 output.append(f"💬 {result['content'][:200]}...")
3565 if result.get("project"):
3566 output.append(f"📁 Project: {result['project']}")
3568 return output
3571async def _perform_main_search(
3572 query: str,
3573 limit: int,
3574 min_score: float,
3575 current_proj: str | None,
3576) -> list:
3577 """Perform the main conversation search."""
3578 db = await get_reflection_database()
3579 return await db.search_conversations(
3580 query=query,
3581 limit=limit,
3582 min_score=min_score,
3583 project=current_proj,
3584 )
3587def _should_retry_search(error: Exception) -> bool:
3588 """Check if search error should trigger a retry."""
3589 error_str = str(error)
3590 return (
3591 "'NoneType' object has no attribute" in error_str
3592 or "Could not set lock" in error_str
3593 )
3596async def _retry_search_with_cleanup(
3597 query: str,
3598 limit: int,
3599 min_score: float,
3600 project: str | None,
3601) -> str:
3602 """Retry search after database cleanup."""
3603 try:
3604 from session_mgmt_mcp.reflection_tools import cleanup_reflection_database
3606 cleanup_reflection_database()
3607 db = await get_reflection_database()
3608 current_proj = project or get_current_project()
3610 results = await db.search_conversations(
3611 query=query,
3612 limit=limit,
3613 min_score=min_score,
3614 project=current_proj,
3615 )
3617 if not results:
3618 return f"🔍 No conversations found matching '{query}' (minimum similarity: {min_score})"
3620 output = []
3621 output.append(f"🔍 Found {len(results)} conversations matching '{query}'")
3622 output.append(f"📊 Project: {current_proj or 'All projects'}")
3623 output.append(f"🎯 Similarity threshold: {min_score}")
3624 output.append("")
3626 for i, result in enumerate(results, 1):
3627 score = result.get("score", 0)
3628 timestamp = result.get("timestamp", "Unknown time")
3629 content_preview = (
3630 result.get("content", "")[:200] + "..."
3631 if len(result.get("content", "")) > 200
3632 else result.get("content", "")
3633 )
3635 output.append(f"📝 Result {i} (similarity: {score:.3f})")
3636 output.append(f"📅 {timestamp}")
3637 output.append(f"💬 {content_preview}")
3638 output.append("")
3640 return "\n".join(output)
3641 except Exception as retry_e:
3642 return f"❌ Error searching conversations (retry failed): {retry_e}"
3645async def reflect_on_past(
3646 query: str,
3647 limit: int = 5,
3648 min_score: float = 0.7,
3649 project: str | None = None,
3650 optimize_tokens: bool = True,
3651 max_tokens: int = 4000,
3652) -> str:
3653 """Search past conversations and store reflections with semantic similarity."""
3654 if not REFLECTION_TOOLS_AVAILABLE: 3654 ↛ 3655line 3654 didn't jump to line 3655 because the condition on line 3654 was never true
3655 return "❌ Reflection tools not available. Install dependencies: pip install duckdb transformers"
3657 try:
3658 current_proj = project or get_current_project()
3659 results = await _perform_main_search(query, limit, min_score, current_proj)
3661 if not results: 3661 ↛ 3662line 3661 didn't jump to line 3662 because the condition on line 3661 was never true
3662 return f"🔍 No relevant conversations found for query: '{query}'\n💡 Try adjusting the search terms or lowering min_score."
3664 # Apply token optimization if available
3665 results, optimization_info = await _optimize_search_results(
3666 results,
3667 optimize_tokens,
3668 max_tokens,
3669 query,
3670 )
3672 # Build and format output
3673 output = _build_search_header(results, query, current_proj, optimization_info)
3674 output.extend(_format_search_results(results))
3676 return "\n".join(output)
3678 except Exception as e:
3679 if _should_retry_search(e): 3679 ↛ 3680line 3679 didn't jump to line 3680 because the condition on line 3679 was never true
3680 return await _retry_search_with_cleanup(query, limit, min_score, project)
3681 return f"❌ Error searching conversations: {e}"
3684@mcp.tool()
3685async def store_reflection(content: str, tags: list[str] | None = None) -> str:
3686 """Store an important insight or reflection for future reference."""
3687 if not REFLECTION_TOOLS_AVAILABLE:
3688 return "❌ Reflection tools not available. Install dependencies: pip install duckdb transformers"
3690 try:
3691 db = await get_reflection_database()
3692 reflection_id = await db.store_reflection(content, tags)
3694 output = []
3695 output.append("💾 Reflection stored successfully!")
3696 output.append(f"🆔 ID: {reflection_id}")
3697 output.append(f"📝 Content: {content[:100]}...")
3698 if tags:
3699 output.append(f"🏷️ Tags: {', '.join(tags)}")
3700 output.append(f"📅 Stored: {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}")
3702 return "\n".join(output)
3704 except Exception as e:
3705 # If connection failed, try to cleanup and retry once
3706 if "'NoneType' object has no attribute 'execute'" in str(e):
3707 try:
3708 from session_mgmt_mcp.reflection_tools import (
3709 cleanup_reflection_database,
3710 )
3712 cleanup_reflection_database()
3713 db = await get_reflection_database()
3714 reflection_id = await db.store_reflection(content, tags)
3716 output = []
3717 output.append(
3718 "💾 Reflection stored successfully! (after connection reset)",
3719 )
3720 output.append(f"🆔 ID: {reflection_id}")
3721 output.append(f"📝 Content: {content[:100]}...")
3722 if tags:
3723 output.append(f"🏷️ Tags: {', '.join(tags)}")
3724 output.append(
3725 f"📅 Stored: {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}",
3726 )
3728 return "\n".join(output)
3729 except Exception as retry_e:
3730 return f"❌ Error storing reflection (retry failed): {retry_e}"
3732 return f"❌ Error storing reflection: {e}"
3735@mcp.tool()
3736async def quick_search(
3737 query: str,
3738 min_score: float = 0.7,
3739 project: str | None = None,
3740) -> str:
3741 """Quick search that returns only the count and top result for fast overview."""
3742 if not REFLECTION_TOOLS_AVAILABLE:
3743 return "❌ Reflection tools not available. Install dependencies: pip install duckdb transformers"
3745 try:
3746 db = await get_reflection_database()
3747 current_proj = project or get_current_project()
3749 results = await db.search_conversations(
3750 query=query,
3751 limit=1,
3752 min_score=min_score,
3753 project=current_proj,
3754 )
3756 # Get total count with lower score threshold
3757 all_results = await db.search_conversations(
3758 query=query,
3759 limit=100,
3760 min_score=0.3,
3761 project=current_proj,
3762 )
3764 output = []
3765 output.append(f"⚡ Quick search: '{query}'")
3766 if current_proj:
3767 output.append(f"📁 Project: {current_proj}")
3768 output.append(f"📊 Total matches: {len(all_results)} (threshold: 0.3)")
3769 output.append(
3770 f"🎯 High relevance: {len([r for r in all_results if r['score'] >= min_score])}",
3771 )
3773 if results:
3774 top_result = results[0]
3775 score_pct = top_result["score"] * 100
3776 output.append(f"\n🥇 Top result (Score: {score_pct:.1f}%):")
3777 output.append(f"💬 {top_result['content'][:150]}...")
3778 output.append(f"📅 {top_result.get('timestamp', 'Unknown time')}")
3779 else:
3780 output.append(
3781 f"\n💡 No high-relevance matches found (min_score: {min_score})",
3782 )
3784 return "\n".join(output)
3786 except Exception as e:
3787 return f"❌ Error in quick search: {e}"
3790@mcp.tool()
3791async def search_summary(
3792 query: str,
3793 min_score: float = 0.7,
3794 project: str | None = None,
3795) -> str:
3796 """Get aggregated insights from search results without individual result details."""
3797 if not REFLECTION_TOOLS_AVAILABLE:
3798 return "❌ Reflection tools not available. Install dependencies: pip install duckdb transformers"
3800 try:
3801 db = await get_reflection_database()
3802 current_proj = project or get_current_project()
3804 results = await db.search_conversations(
3805 query=query,
3806 limit=10,
3807 min_score=min_score,
3808 project=current_proj,
3809 )
3811 if not results:
3812 return f"📊 Search Summary: No relevant conversations found for '{query}'"
3814 # Analyze results
3815 total_results = len(results)
3816 avg_score = sum(r["score"] for r in results) / total_results
3817 projects = {r.get("project") for r in results if r.get("project")}
3819 # Extract common themes (simple keyword analysis)
3820 all_content = " ".join(r["content"] for r in results)
3821 common_words = {}
3822 for word in all_content.lower().split():
3823 if len(word) > 4: # Only consider longer words
3824 common_words[word] = common_words.get(word, 0) + 1
3826 top_themes = sorted(common_words.items(), key=lambda x: x[1], reverse=True)[:5]
3828 output = []
3829 output.append(f"📊 Search Summary: '{query}'")
3830 output.append("=" * 40)
3831 output.append(f"📈 Total matches: {total_results}")
3832 output.append(f"🎯 Average relevance: {avg_score * 100:.1f}%")
3833 if projects:
3834 output.append(f"📁 Projects involved: {', '.join(projects)}")
3836 if top_themes:
3837 output.append("\n🔑 Common themes:")
3838 for word, count in top_themes:
3839 if count > 1:
3840 output.append(f" • {word} ({count} mentions)")
3842 return "\n".join(output)
3844 except Exception as e:
3845 return f"❌ Error generating search summary: {e}"
3848@mcp.tool()
3849async def get_more_results(
3850 query: str,
3851 offset: int = 3,
3852 limit: int = 3,
3853 project: str | None = None,
3854) -> str:
3855 """Get additional search results after an initial search (pagination support)."""
3856 if not REFLECTION_TOOLS_AVAILABLE:
3857 return "❌ Reflection tools not available. Install dependencies: pip install duckdb transformers"
3859 try:
3860 db = await get_reflection_database()
3861 current_proj = project or get_current_project()
3863 # Get more results than needed to handle offset
3864 all_results = await db.search_conversations(
3865 query=query,
3866 limit=offset + limit,
3867 min_score=0.5,
3868 project=current_proj,
3869 )
3871 # Apply offset and limit
3872 paginated_results = all_results[offset : offset + limit]
3874 if not paginated_results:
3875 return f"📄 No more results found starting from position {offset + 1}"
3877 output = []
3878 output.append(
3879 f"📄 Additional results for: '{query}' (positions {offset + 1}-{offset + len(paginated_results)})",
3880 )
3881 if current_proj:
3882 output.append(f"📁 Project: {current_proj}")
3883 output.append("=" * 50)
3885 for i, result in enumerate(paginated_results, offset + 1):
3886 score_pct = result["score"] * 100
3887 timestamp = result.get("timestamp", "Unknown time")
3888 output.append(f"\n#{i} (Score: {score_pct:.1f}%)")
3889 output.append(f"📅 {timestamp}")
3890 output.append(f"💬 {result['content'][:200]}...")
3892 return "\n".join(output)
3894 except Exception as e:
3895 return f"❌ Error getting more results: {e}"
3898@mcp.tool()
3899async def search_by_file(
3900 file_path: str,
3901 limit: int = 10,
3902 project: str | None = None,
3903) -> str:
3904 """Search for conversations that analyzed a specific file."""
3905 if not REFLECTION_TOOLS_AVAILABLE:
3906 return "❌ Reflection tools not available. Install dependencies: pip install duckdb transformers"
3908 try:
3909 db = await get_reflection_database()
3910 current_proj = project or get_current_project()
3912 results = await db.search_by_file(
3913 file_path=file_path,
3914 limit=limit,
3915 project=current_proj,
3916 )
3918 if not results:
3919 return f"🔍 No conversations found mentioning file: {file_path}"
3921 output = []
3922 output.append(f"📁 Found {len(results)} conversations about: {file_path}")
3923 if current_proj:
3924 output.append(f"🗂️ Project: {current_proj}")
3925 output.append("=" * 60)
3927 for i, result in enumerate(results, 1):
3928 timestamp = result.get("timestamp", "Unknown time")
3929 output.append(f"\n#{i}")
3930 output.append(f"📅 {timestamp}")
3931 output.append(f"💬 {result['content'][:250]}...")
3933 return "\n".join(output)
3935 except Exception as e:
3936 return f"❌ Error searching by file: {e}"
3939@mcp.tool()
3940async def search_by_concept(
3941 concept: str,
3942 include_files: bool = True,
3943 limit: int = 10,
3944 project: str | None = None,
3945) -> str:
3946 """Search for conversations about a specific development concept."""
3947 if not REFLECTION_TOOLS_AVAILABLE:
3948 return "❌ Reflection tools not available. Install dependencies: pip install duckdb transformers"
3950 try:
3951 db = await get_reflection_database()
3952 current_proj = project or get_current_project()
3954 # Search both conversations and reflections for the concept
3955 conv_results = await db.search_conversations(
3956 query=concept,
3957 limit=limit,
3958 min_score=0.6,
3959 project=current_proj,
3960 )
3962 refl_results = await db.search_reflections(
3963 query=concept,
3964 limit=5,
3965 min_score=0.6,
3966 )
3968 output = []
3969 output.append(f"🧠 Concept search: '{concept}'")
3970 if current_proj:
3971 output.append(f"📁 Project: {current_proj}")
3972 output.append("=" * 50)
3974 if conv_results:
3975 output.append(f"\n💬 Conversations ({len(conv_results)}):")
3976 for i, result in enumerate(conv_results[:5], 1):
3977 score_pct = result["score"] * 100
3978 output.append(
3979 f"#{i} (Score: {score_pct:.1f}%) {result['content'][:150]}...",
3980 )
3982 if refl_results:
3983 output.append(f"\n💭 Reflections ({len(refl_results)}):")
3984 for i, result in enumerate(refl_results, 1):
3985 score_pct = result["score"] * 100
3986 tags_str = f" [{', '.join(result['tags'])}]" if result["tags"] else ""
3987 output.append(
3988 f"#{i} (Score: {score_pct:.1f}%){tags_str} {result['content'][:150]}...",
3989 )
3991 if not conv_results and not refl_results:
3992 output.append(f"🔍 No relevant content found for concept: '{concept}'")
3993 output.append(
3994 "💡 Try related terms or check if conversations about this topic exist",
3995 )
3997 return "\n".join(output)
3999 except Exception as e:
4000 return f"❌ Error searching by concept: {e}"
4003@mcp.tool()
4004async def reset_reflection_database() -> str:
4005 """Reset the reflection database connection to fix lock issues."""
4006 if not REFLECTION_TOOLS_AVAILABLE:
4007 return "❌ Reflection tools not available. Install dependencies: pip install duckdb transformers"
4009 try:
4010 from session_mgmt_mcp.reflection_tools import (
4011 cleanup_reflection_database,
4012 get_reflection_database,
4013 )
4015 # Clean up existing instance
4016 cleanup_reflection_database()
4018 # Test new connection
4019 db = await get_reflection_database()
4020 stats = await db.get_stats()
4022 return f"✅ Reflection database reset successfully!\n📊 Stats: {stats}"
4024 except Exception as e:
4025 return f"❌ Error resetting database: {e}"
4028@mcp.tool()
4029async def reflection_stats() -> str:
4030 """Get statistics about the reflection database."""
4031 if not REFLECTION_TOOLS_AVAILABLE:
4032 return "❌ Reflection tools not available. Install dependencies: pip install duckdb transformers"
4034 try:
4035 db = await get_reflection_database()
4036 stats = await db.get_stats()
4038 if "error" in stats:
4039 return f"❌ {stats['error']}"
4041 output = []
4042 output.append("📊 Reflection Database Statistics")
4043 output.append("=" * 40)
4044 output.append(f"💬 Conversations: {stats['conversations_count']}")
4045 output.append(f"💭 Reflections: {stats['reflections_count']}")
4046 output.append(f"🧠 Embedding provider: {stats['embedding_provider']}")
4047 output.append(f"📏 Embedding dimension: {stats['embedding_dimension']}")
4048 output.append(f"💾 Database: {stats['database_path']}")
4050 total_items = stats["conversations_count"] + stats["reflections_count"]
4051 output.append(f"\n📈 Total stored items: {total_items}")
4053 if total_items > 0:
4054 output.append("✅ Memory system is active and contains data")
4055 else:
4056 output.append("💡 Memory system is ready but contains no data yet")
4058 return "\n".join(output)
4060 except Exception as e:
4061 return f"❌ Error getting reflection stats: {e}"
4064# Token Optimization Tools
4065@mcp.tool()
4066async def get_cached_chunk(cache_key: str, chunk_index: int) -> str:
4067 """Get a specific chunk from cached chunked response."""
4068 if not TOKEN_OPTIMIZER_AVAILABLE:
4069 return "❌ Token optimizer not available. Install dependencies: pip install tiktoken"
4071 try:
4072 from .token_optimizer import get_cached_chunk
4074 chunk_data = await get_cached_chunk(cache_key, chunk_index)
4076 if not chunk_data:
4077 return f"❌ Chunk not found. Cache key '{cache_key}' or chunk {chunk_index} may have expired."
4079 output = []
4080 output.append(
4081 f"📄 Chunk {chunk_data['current_chunk']} of {chunk_data['total_chunks']}",
4082 )
4083 output.append("=" * 50)
4085 chunk = chunk_data["chunk"]
4086 for i, item in enumerate(chunk, 1):
4087 timestamp = item.get("timestamp", "Unknown time")
4088 output.append(f"\n#{i}")
4089 output.append(f"📅 {timestamp}")
4090 output.append(f"💬 {item.get('content', '')[:200]}...")
4092 if chunk_data["has_more"]:
4093 output.append(
4094 f"\n💡 More chunks available. Use: get_cached_chunk('{cache_key}', {chunk_data['current_chunk'] + 1})",
4095 )
4097 return "\n".join(output)
4099 except Exception as e:
4100 return f"❌ Error retrieving cached chunk: {e}"
4103@mcp.tool()
4104async def get_token_usage_stats(hours: int = 24) -> str:
4105 """Get token usage statistics and optimization metrics."""
4106 if not TOKEN_OPTIMIZER_AVAILABLE:
4107 return "❌ Token optimizer not available. Install dependencies: pip install tiktoken"
4109 try:
4110 from .token_optimizer import get_token_usage_stats
4112 stats = await get_token_usage_stats(hours)
4114 if stats["status"] == "no_data":
4115 return f"📊 No token usage data available for the last {hours} hours"
4117 output = []
4118 output.append(f"📊 Token Usage Statistics (Last {hours} hours)")
4119 output.append("=" * 50)
4120 output.append(f"📈 Total Requests: {stats['total_requests']}")
4121 output.append(f"🔤 Total Tokens Used: {stats['total_tokens']:,}")
4122 output.append(
4123 f"📊 Average Tokens per Request: {stats['average_tokens_per_request']}",
4124 )
4126 if stats.get("optimizations_applied"):
4127 output.append("\n⚡ Optimizations Applied:")
4128 for strategy, count in stats["optimizations_applied"].items():
4129 output.append(f" • {strategy}: {count} times")
4131 cost_savings = stats.get("estimated_cost_savings", {})
4132 if cost_savings.get("savings_usd", 0) > 0:
4133 output.append("\n💰 Estimated Cost Savings:")
4134 output.append(f" • ${cost_savings['savings_usd']:.4f} USD saved")
4135 output.append(
4136 f" • {cost_savings['estimated_tokens_saved']:,} tokens saved",
4137 )
4138 output.append(
4139 f" • {cost_savings['requests_optimized']} optimized requests",
4140 )
4142 return "\n".join(output)
4144 except Exception as e:
4145 return f"❌ Error getting token usage stats: {e}"
4148@mcp.tool()
4149async def optimize_memory_usage(
4150 strategy: str = "auto",
4151 max_age_days: int = 30,
4152 dry_run: bool = True,
4153) -> str:
4154 """Optimize memory usage by consolidating old conversations."""
4155 if not TOKEN_OPTIMIZER_AVAILABLE or not REFLECTION_TOOLS_AVAILABLE:
4156 return (
4157 "❌ Memory optimization requires both token optimizer and reflection tools"
4158 )
4160 try:
4161 from .memory_optimizer import MemoryOptimizer
4163 db = await get_reflection_database()
4164 optimizer = MemoryOptimizer(db)
4166 # Set up retention policy based on parameters
4167 policy = None
4168 if strategy != "auto":
4169 policy = {
4170 "consolidation_age_days": max_age_days,
4171 "importance_threshold": 0.3 if strategy == "aggressive" else 0.5,
4172 }
4174 results = await optimizer.compress_memory(policy=policy, dry_run=dry_run)
4176 if results.get("error"):
4177 return f"❌ Memory optimization error: {results['error']}"
4179 output = []
4180 output.append(
4181 f"🧠 Memory Optimization Results {'(DRY RUN)' if dry_run else ''}",
4182 )
4183 output.append("=" * 50)
4184 output.append(f"📊 Total Conversations: {results['total_conversations']}")
4185 output.append(f"✅ Conversations to Keep: {results['conversations_to_keep']}")
4186 output.append(
4187 f"📦 Conversations to Consolidate: {results['conversations_to_consolidate']}",
4188 )
4189 output.append(f"🔗 Clusters Created: {results['clusters_created']}")
4191 if results.get("space_saved_estimate", 0) > 0:
4192 output.append("\n💾 Space Optimization:")
4193 output.append(f" • {results['space_saved_estimate']:,} characters saved")
4194 output.append(f" • {results['compression_ratio']:.1%} compression ratio")
4196 if results.get("consolidated_summaries"):
4197 output.append("\n📝 Consolidation Preview:")
4198 for i, summary in enumerate(results["consolidated_summaries"][:3], 1):
4199 output.append(
4200 f" #{i}: {summary['original_count']} conversations → 1 summary",
4201 )
4202 output.append(f" Projects: {', '.join(summary['projects'][:2])}")
4203 output.append(f" Summary: {summary['summary'][:100]}...")
4205 if dry_run:
4206 output.append("\n💡 Run with dry_run=False to apply changes")
4208 return "\n".join(output)
4210 except Exception as e:
4211 return f"❌ Error optimizing memory: {e}"
4214# Enhanced Search Tools (Phase 1)
4217@mcp.tool()
4218async def search_code(
4219 query: str,
4220 pattern_type: str | None = None,
4221 limit: int = 10,
4222 project: str | None = None,
4223) -> str:
4224 """Search for code patterns in conversations using AST parsing.
4226 Args:
4227 query: Search query for code patterns
4228 pattern_type: Type of code pattern (function, class, import, assignment, call, loop, conditional, try, async)
4229 limit: Maximum number of results to return
4230 project: Optional project filter
4232 """
4233 if not ENHANCED_SEARCH_AVAILABLE or not REFLECTION_TOOLS_AVAILABLE:
4234 return "❌ Enhanced search or reflection tools not available"
4236 try:
4237 db = await get_reflection_database()
4238 search_engine = EnhancedSearchEngine(db)
4240 results = await search_engine.search_code_patterns(query, pattern_type, limit)
4242 if not results:
4243 return f"🔍 No code patterns found for query: '{query}'"
4245 output = []
4246 output.append(f"🔍 Code Pattern Search: '{query}'")
4247 if pattern_type:
4248 output.append(f"📝 Pattern type: {pattern_type}")
4249 output.append("=" * 50)
4251 for i, result in enumerate(results, 1):
4252 pattern = result["pattern"]
4253 relevance_pct = result["relevance"] * 100
4255 output.append(f"\n#{i} (Relevance: {relevance_pct:.1f}%)")
4256 output.append(f"📁 Project: {result['project']}")
4257 output.append(f"🕒 Time: {result['timestamp']}")
4258 output.append(f"📋 Pattern: {pattern['type']}")
4260 if "name" in pattern:
4261 output.append(f"🏷️ Name: {pattern['name']}")
4263 output.append("💻 Code snippet:")
4264 code_lines = pattern["content"].split("\n")[:3] # First 3 lines
4265 for line in code_lines:
4266 output.append(f" {line}")
4267 if len(pattern["content"].split("\n")) > 3:
4268 output.append(" ...")
4270 return "\n".join(output)
4272 except Exception as e:
4273 return f"❌ Error searching code patterns: {e}"
4276@mcp.tool()
4277async def search_errors(
4278 query: str,
4279 error_type: str | None = None,
4280 limit: int = 10,
4281 project: str | None = None,
4282) -> str:
4283 """Search for error patterns and debugging contexts in conversations.
4285 Args:
4286 query: Search query for error patterns
4287 error_type: Type of error (python_traceback, python_exception, javascript_error, etc.)
4288 limit: Maximum number of results to return
4289 project: Optional project filter
4291 """
4292 if not ENHANCED_SEARCH_AVAILABLE or not REFLECTION_TOOLS_AVAILABLE:
4293 return "❌ Enhanced search or reflection tools not available"
4295 try:
4296 db = await get_reflection_database()
4297 search_engine = EnhancedSearchEngine(db)
4299 results = await search_engine.search_error_patterns(query, error_type, limit)
4301 if not results:
4302 return f"🔍 No error patterns found for query: '{query}'"
4304 output = []
4305 output.append(f"🚨 Error Pattern Search: '{query}'")
4306 if error_type:
4307 output.append(f"⚠️ Error type: {error_type}")
4308 output.append("=" * 50)
4310 for i, result in enumerate(results, 1):
4311 pattern = result["pattern"]
4312 relevance_pct = result["relevance"] * 100
4314 output.append(f"\n#{i} (Relevance: {relevance_pct:.1f}%)")
4315 output.append(f"📁 Project: {result['project']}")
4316 output.append(f"🕒 Time: {result['timestamp']}")
4317 output.append(f"🚨 Pattern: {pattern['type']} - {pattern['subtype']}")
4319 if pattern["type"] == "error":
4320 if pattern.get("groups"):
4321 output.append(
4322 f"💀 Error: {pattern['groups'][0] if pattern['groups'] else 'Unknown'}",
4323 )
4324 if len(pattern["groups"]) > 1:
4325 output.append(f"📝 Message: {pattern['groups'][1]}")
4327 # Show relevant snippet
4328 snippet = result["snippet"]
4329 output.append(f"📄 Context: {snippet[:200]}...")
4331 return "\n".join(output)
4333 except Exception as e:
4334 return f"❌ Error searching error patterns: {e}"
4337@mcp.tool()
4338async def search_temporal(
4339 time_expression: str,
4340 query: str | None = None,
4341 limit: int = 10,
4342 project: str | None = None,
4343) -> str:
4344 """Search conversations within a specific time range using natural language.
4346 Args:
4347 time_expression: Natural language time expression (e.g., "yesterday", "last week", "2 days ago")
4348 query: Optional search query to filter results within the time range
4349 limit: Maximum number of results to return
4350 project: Optional project filter
4352 """
4353 if not ENHANCED_SEARCH_AVAILABLE or not REFLECTION_TOOLS_AVAILABLE:
4354 return "❌ Enhanced search or reflection tools not available"
4356 try:
4357 db = await get_reflection_database()
4358 search_engine = EnhancedSearchEngine(db)
4360 results = await search_engine.search_temporal(time_expression, query, limit)
4362 if not results:
4363 return f"🔍 No conversations found for time: '{time_expression}'"
4365 if len(results) == 1 and "error" in results[0]:
4366 return f"❌ {results[0]['error']}"
4368 output = []
4369 output.append(f"🕒 Temporal Search: '{time_expression}'")
4370 if query:
4371 output.append(f"🔍 Query: '{query}'")
4372 output.append("=" * 50)
4374 for i, result in enumerate(results, 1):
4375 relevance_pct = result.get("relevance", 1.0) * 100
4377 output.append(f"\n#{i} (Relevance: {relevance_pct:.1f}%)")
4378 output.append(f"📁 Project: {result['project']}")
4379 output.append(f"🕒 Time: {result['timestamp']}")
4380 output.append(f"💬 Content: {result['content']}")
4382 return "\n".join(output)
4384 except Exception as e:
4385 return f"❌ Error in temporal search: {e}"
4388@mcp.tool()
4389async def auto_load_context(
4390 working_directory: str | None = None,
4391 max_conversations: int = 10,
4392 min_relevance: float = 0.3,
4393) -> str:
4394 """Automatically load relevant conversations based on current development context.
4396 Args:
4397 working_directory: Optional working directory override
4398 max_conversations: Maximum number of conversations to load
4399 min_relevance: Minimum relevance score (0.0-1.0)
4401 """
4402 if not AUTO_CONTEXT_AVAILABLE or not REFLECTION_TOOLS_AVAILABLE:
4403 return "❌ Auto-context loading or reflection tools not available"
4405 try:
4406 db = await get_reflection_database()
4407 context_loader = AutoContextLoader(db)
4409 result = await context_loader.load_relevant_context(
4410 working_directory,
4411 max_conversations,
4412 min_relevance,
4413 )
4415 context = result["context"]
4416 conversations = result["relevant_conversations"]
4418 output = []
4419 output.append("🤖 Auto-Context Loading Results")
4420 output.append("=" * 50)
4422 # Context summary
4423 output.append("\n📋 Detected Context:")
4424 output.append(f"📁 Project: {context['project_name']}")
4426 if context["detected_languages"]:
4427 langs = ", ".join(context["detected_languages"])
4428 output.append(f"💻 Languages: {langs}")
4430 if context["detected_tools"]:
4431 tools = ", ".join(context["detected_tools"])
4432 output.append(f"🔧 Tools: {tools}")
4434 if context["project_type"]:
4435 proj_type = context["project_type"].replace("_", " ").title()
4436 output.append(f"📋 Type: {proj_type}")
4438 confidence = context["confidence_score"] * 100
4439 output.append(f"🎯 Detection confidence: {confidence:.0f}%")
4441 # Git info
4442 if context["git_info"].get("is_git_repo"):
4443 git_info = context["git_info"]
4444 branch = git_info.get("current_branch", "unknown")
4445 platform = git_info.get("platform", "git")
4446 output.append(f"🌿 Git: {branch} branch on {platform}")
4448 # Relevant conversations
4449 output.append(
4450 f"\n💬 Loaded Conversations: {result['loaded_count']}/{result['total_found']} found",
4451 )
4452 if conversations:
4453 for i, conv in enumerate(conversations, 1):
4454 relevance_pct = conv["relevance_score"] * 100
4455 output.append(f"\n#{i} (Relevance: {relevance_pct:.1f}%)")
4456 output.append(f"📁 Project: {conv['project']}")
4457 output.append(f"🕒 Time: {conv['timestamp']}")
4459 # Show snippet of content
4460 content_preview = conv["content"][:200]
4461 if len(conv["content"]) > 200:
4462 content_preview += "..."
4463 output.append(f"💬 Content: {content_preview}")
4464 else:
4465 output.append(
4466 f"🔍 No conversations found above {min_relevance:.1f} relevance threshold",
4467 )
4468 output.append(
4469 "💡 Try lowering min_relevance or working on this project more",
4470 )
4472 # Recent files info
4473 if context["recent_files"]:
4474 recent_count = len(context["recent_files"])
4475 output.append(
4476 f"\n📄 Recent activity: {recent_count} files modified in last 2 hours",
4477 )
4478 for file_info in context["recent_files"][:3]: # Show top 3
4479 output.append(f" 📝 {file_info['path']}")
4481 return "\n".join(output)
4483 except Exception as e:
4484 return f"❌ Error in auto-context loading: {e}"
4487@mcp.tool()
4488async def get_context_summary(working_directory: str | None = None) -> str:
4489 """Get a summary of current development context without loading conversations.
4491 Args:
4492 working_directory: Optional working directory override
4494 """
4495 if not AUTO_CONTEXT_AVAILABLE:
4496 return "❌ Auto-context loading tools not available"
4498 try:
4499 db = await get_reflection_database() if REFLECTION_TOOLS_AVAILABLE else None
4500 context_loader = AutoContextLoader(db) if db else None
4502 if context_loader:
4503 summary = await context_loader.get_context_summary(working_directory)
4504 else:
4505 # Fallback to basic context detection
4506 from session_mgmt_mcp.context_manager import ContextDetector
4508 detector = ContextDetector()
4509 context = detector.detect_current_context(working_directory)
4511 summary_parts = []
4512 summary_parts.append(f"📁 Project: {context['project_name']}")
4513 summary_parts.append(f"📂 Directory: {context['working_directory']}")
4515 if context["detected_languages"]:
4516 langs = ", ".join(context["detected_languages"])
4517 summary_parts.append(f"💻 Languages: {langs}")
4519 if context["detected_tools"]:
4520 tools = ", ".join(context["detected_tools"])
4521 summary_parts.append(f"🔧 Tools: {tools}")
4523 confidence = context["confidence_score"] * 100
4524 summary_parts.append(f"🎯 Detection confidence: {confidence:.0f}%")
4526 summary = "\n".join(summary_parts)
4528 output = []
4529 output.append("📋 Current Development Context")
4530 output.append("=" * 40)
4531 output.append(summary)
4533 return "\n".join(output)
4535 except Exception as e:
4536 return f"❌ Error getting context summary: {e}"
4539@mcp.tool()
4540async def compress_memory(
4541 max_age_days: int = 30,
4542 max_conversations: int = 10000,
4543 importance_threshold: float = 0.3,
4544 dry_run: bool = False,
4545) -> str:
4546 """Compress conversation memory by consolidating old conversations.
4548 Args:
4549 max_age_days: Consolidate conversations older than this many days
4550 max_conversations: Maximum total conversations to keep
4551 importance_threshold: Minimum importance score to avoid consolidation (0.0-1.0)
4552 dry_run: Preview changes without actually applying them
4554 """
4555 if not MEMORY_OPTIMIZER_AVAILABLE or not REFLECTION_TOOLS_AVAILABLE:
4556 return "❌ Memory optimizer or reflection tools not available"
4558 try:
4559 db = await get_reflection_database()
4560 optimizer = MemoryOptimizer(db)
4562 policy = {
4563 "consolidation_age_days": max_age_days,
4564 "max_conversations": max_conversations,
4565 "importance_threshold": importance_threshold,
4566 }
4568 result = await optimizer.compress_memory(policy, dry_run)
4570 if "error" in result:
4571 return f"❌ {result['error']}"
4573 output = []
4574 output.append("🗜️ Memory Compression Results")
4575 output.append("=" * 50)
4577 if dry_run:
4578 output.append("🔍 DRY RUN - No changes made")
4580 output.append("\n📊 Overview:")
4581 output.append(f"📁 Total conversations: {result['total_conversations']}")
4582 output.append(f"✅ Conversations to keep: {result['conversations_to_keep']}")
4583 output.append(
4584 f"🗜️ Conversations to consolidate: {result['conversations_to_consolidate']}",
4585 )
4586 output.append(f"📦 Clusters created: {result['clusters_created']}")
4588 if result["space_saved_estimate"] > 0:
4589 space_mb = result["space_saved_estimate"] / (1024 * 1024)
4590 compression_pct = result["compression_ratio"] * 100
4591 output.append(
4592 f"💾 Space saved: {space_mb:.2f}MB ({compression_pct:.1f}% reduction)",
4593 )
4595 # Show consolidation details
4596 if result["consolidated_summaries"]:
4597 output.append("\n📋 Consolidation Details:")
4598 for i, summary in enumerate(
4599 result["consolidated_summaries"][:5],
4600 1,
4601 ): # Show top 5
4602 original_kb = summary["original_size"] / 1024
4603 compressed_kb = summary["compressed_size"] / 1024
4604 reduction_pct = (
4605 1 - summary["compressed_size"] / summary["original_size"]
4606 ) * 100
4608 output.append(f"\n#{i} Cluster:")
4609 output.append(
4610 f" 📁 Projects: {', '.join(summary['projects']) if summary['projects'] else 'Multiple'}",
4611 )
4612 output.append(f" 💬 Conversations: {summary['original_count']}")
4613 output.append(
4614 f" 📏 Size: {original_kb:.1f}KB → {compressed_kb:.1f}KB ({reduction_pct:.1f}% reduction)",
4615 )
4616 output.append(f" 🕒 Time range: {summary['time_range']}")
4617 output.append(f" 📝 Summary: {summary['summary'][:100]}...")
4619 if not dry_run and result["consolidated_summaries"]:
4620 output.append("\n✅ Memory compression completed successfully!")
4621 elif dry_run:
4622 output.append("\n💡 Run with dry_run=False to apply these changes")
4623 elif not result["consolidated_summaries"]:
4624 output.append(
4625 "\n💡 No consolidation needed - all conversations are recent or important",
4626 )
4628 return "\n".join(output)
4630 except Exception as e:
4631 return f"❌ Error compressing memory: {e}"
4634@mcp.tool()
4635async def get_compression_stats() -> str:
4636 """Get memory compression statistics and history."""
4637 if not MEMORY_OPTIMIZER_AVAILABLE or not REFLECTION_TOOLS_AVAILABLE:
4638 return "❌ Memory optimizer or reflection tools not available"
4640 try:
4641 db = await get_reflection_database()
4642 optimizer = MemoryOptimizer(db)
4644 stats = await optimizer.get_compression_stats()
4646 output = []
4647 output.append("📊 Memory Compression Statistics")
4648 output.append("=" * 40)
4650 if stats["last_run"]:
4651 last_run = stats["last_run"]
4652 output.append(f"🕒 Last compression: {last_run}")
4653 output.append(
4654 f"💬 Conversations processed: {stats['conversations_processed']}",
4655 )
4656 output.append(
4657 f"📦 Conversations consolidated: {stats['conversations_consolidated']}",
4658 )
4660 if stats["space_saved_bytes"] > 0:
4661 space_mb = stats["space_saved_bytes"] / (1024 * 1024)
4662 compression_pct = stats["compression_ratio"] * 100
4663 output.append(f"💾 Space saved: {space_mb:.2f}MB")
4664 output.append(f"🗜️ Compression ratio: {compression_pct:.1f}%")
4665 else:
4666 output.append("💡 No compression runs performed yet")
4667 output.append(
4668 "🔧 Use compress_memory() to start optimizing your conversation storage",
4669 )
4671 # Current database stats
4672 if hasattr(db, "conn") and db.conn:
4673 cursor = db.conn.execute("SELECT COUNT(*) FROM conversations")
4674 total_conversations = cursor.fetchone()[0]
4676 cursor = db.conn.execute("SELECT SUM(LENGTH(content)) FROM conversations")
4677 total_size = cursor.fetchone()[0] or 0
4679 output.append("\n📈 Current Database:")
4680 output.append(f"💬 Total conversations: {total_conversations}")
4681 size_mb = total_size / (1024 * 1024)
4682 output.append(f"💾 Total size: {size_mb:.2f}MB")
4684 # Check for consolidated conversations
4685 cursor = db.conn.execute(
4686 "SELECT COUNT(*) FROM conversations WHERE metadata LIKE '%consolidated%'",
4687 )
4688 consolidated_count = cursor.fetchone()[0]
4689 if consolidated_count > 0:
4690 output.append(f"📦 Consolidated conversations: {consolidated_count}")
4692 return "\n".join(output)
4694 except Exception as e:
4695 return f"❌ Error getting compression stats: {e}"
4698@mcp.tool()
4699async def set_retention_policy(
4700 max_age_days: int = 365,
4701 max_conversations: int = 10000,
4702 importance_threshold: float = 0.3,
4703 consolidation_age_days: int = 30,
4704) -> str:
4705 """Set memory retention policy parameters.
4707 Args:
4708 max_age_days: Maximum age in days to keep conversations
4709 max_conversations: Maximum total conversations to store
4710 importance_threshold: Minimum importance score to retain (0.0-1.0)
4711 consolidation_age_days: Age in days after which to consolidate conversations
4713 """
4714 if not MEMORY_OPTIMIZER_AVAILABLE:
4715 return "❌ Memory optimizer not available"
4717 try:
4718 # Create temporary optimizer to set policy
4719 from session_mgmt_mcp.memory_optimizer import MemoryOptimizer
4721 db = await get_reflection_database() if REFLECTION_TOOLS_AVAILABLE else None
4722 optimizer = MemoryOptimizer(db) if db else MemoryOptimizer(None)
4724 policy = {
4725 "max_age_days": max_age_days,
4726 "max_conversations": max_conversations,
4727 "importance_threshold": importance_threshold,
4728 "consolidation_age_days": consolidation_age_days,
4729 }
4731 result = await optimizer.set_retention_policy(policy)
4733 if "error" in result:
4734 return f"❌ {result['error']}"
4736 output = []
4737 output.append("⚙️ Retention Policy Updated")
4738 output.append("=" * 40)
4740 policy = result["updated_policy"]
4741 output.append(f"📅 Max age: {policy['max_age_days']} days")
4742 output.append(f"💬 Max conversations: {policy['max_conversations']}")
4743 output.append(f"⭐ Importance threshold: {policy['importance_threshold']:.1f}")
4744 output.append(f"📦 Consolidation age: {policy['consolidation_age_days']} days")
4745 output.append(
4746 f"🗜️ Target compression: {policy.get('compression_ratio', 0.5) * 100:.0f}%",
4747 )
4749 output.append("\n💡 These settings will be applied on the next compression run")
4751 return "\n".join(output)
4753 except Exception as e:
4754 return f"❌ Error setting retention policy: {e}"
4757@mcp.prompt("compress-memory")
4758async def get_compress_memory_prompt() -> str:
4759 """Compress conversation memory by consolidating old conversations into summaries."""
4760 return """# Memory Compression
4762Compress conversation memory by consolidating old conversations into summaries.
4764This command will:
4765- Analyze conversation age and importance
4766- Group related conversations into clusters
4767- Create consolidated summaries of old conversations
4768- Remove redundant conversation data
4769- Calculate space savings and compression ratios
4771Examples:
4772- Default compression: compress_memory()
4773- Preview changes: dry_run=True
4774- Aggressive compression: max_age_days=14, importance_threshold=0.5
4776Use this periodically to keep your conversation memory manageable and efficient."""
4779@mcp.prompt("compression-stats")
4780async def get_compression_stats_prompt() -> str:
4781 """Get detailed statistics about memory compression history and current database status."""
4782 return """# Compression Statistics
4784Get detailed statistics about memory compression history and current database status.
4786This command will:
4787- Show last compression run details
4788- Display space savings and compression ratios
4789- Report current database size and conversation count
4790- Show number of consolidated conversations
4791- Provide compression efficiency metrics
4793Use this to monitor memory usage and compression effectiveness."""
4796@mcp.prompt("retention-policy")
4797async def get_retention_policy_prompt() -> str:
4798 """Configure memory retention policy parameters for automatic compression."""
4799 return """# Retention Policy
4801Configure memory retention policy parameters for automatic compression.
4803This command will:
4804- Set maximum conversation age and count limits
4805- Configure importance threshold for retention
4806- Define consolidation age triggers
4807- Adjust compression ratio targets
4809Examples:
4810- Conservative: max_age_days=365, importance_threshold=0.2
4811- Aggressive: max_age_days=90, importance_threshold=0.5
4812- Custom: consolidation_age_days=14
4814Use this to customize how your conversation memory is managed over time."""
4817@mcp.prompt("auto-load-context")
4818async def get_auto_load_context_prompt() -> str:
4819 """Automatically detect current development context and load relevant conversations."""
4820 return """# Auto-Context Loading
4822Automatically detect current development context and load relevant conversations.
4824This command will:
4825- Analyze your current project structure and files
4826- Detect programming languages and tools in use
4827- Identify project type (web app, CLI tool, library, etc.)
4828- Find recent file modifications
4829- Load conversations relevant to your current context
4830- Score conversations by relevance to current work
4832Examples:
4833- Load default context: auto_load_context()
4834- Increase results: max_conversations=20
4835- Lower threshold: min_relevance=0.2
4837Use this at the start of coding sessions to get relevant context automatically."""
4840@mcp.prompt("context-summary")
4841async def get_context_summary_prompt() -> str:
4842 """Get a quick summary of your current development context without loading conversations."""
4843 return """# Context Summary
4845Get a quick summary of your current development context without loading conversations.
4847This command will:
4848- Detect current project name and type
4849- Identify programming languages and tools
4850- Show Git repository information
4851- Display recently modified files
4852- Calculate detection confidence score
4854Use this to understand what context the system has detected about your current work."""
4857# Register enhanced search prompts
4860@mcp.prompt("search-code")
4861async def get_search_code_prompt() -> str:
4862 """Search for code patterns in conversations using AST parsing."""
4863 return """# Code Pattern Search
4865Search for code patterns in your conversation history using AST (Abstract Syntax Tree) parsing.
4867This command will:
4868- Parse Python code blocks from conversations
4869- Extract functions, classes, imports, loops, and other patterns
4870- Rank results by relevance to your query
4871- Show code context and project information
4873Examples:
4874- Search for functions: pattern_type='function'
4875- Search for class definitions: pattern_type='class'
4876- Search for error handling: query='try except'
4878Use this to find code examples and patterns from your development sessions."""
4881@mcp.prompt("search-errors")
4882async def get_search_errors_prompt() -> str:
4883 """Search for error patterns and debugging contexts in conversations."""
4884 return """# Error Pattern Search
4886Search for error messages, exceptions, and debugging contexts in your conversation history.
4888This command will:
4889- Find Python tracebacks and exceptions
4890- Detect JavaScript errors and warnings
4891- Identify debugging and testing contexts
4892- Show error context and solutions
4894Examples:
4895- Find Python errors: error_type='python_exception'
4896- Find import issues: query='ImportError'
4897- Find debugging sessions: query='debug'
4899Use this to quickly find solutions to similar errors you've encountered before."""
4902@mcp.prompt("search-temporal")
4903async def get_search_temporal_prompt() -> str:
4904 """Search conversations within a specific time range using natural language."""
4905 return """# Temporal Search
4907Search your conversation history using natural language time expressions.
4909This command will:
4910- Parse time expressions like "yesterday", "last week", "2 days ago"
4911- Find conversations within that time range
4912- Optionally filter by additional search terms
4913- Sort results by time and relevance
4915Examples:
4916- "yesterday" - conversations from yesterday
4917- "last week" - conversations from the past week
4918- "2 days ago" - conversations from 2 days ago
4919- "this month" + query - filter by content within the month
4921Use this to find recent discussions or work from specific time periods."""
4924@mcp.prompt("start-app-monitoring")
4925async def get_start_app_monitoring_prompt() -> str:
4926 """Start monitoring IDE activity and browser documentation usage."""
4927 return """# Start Application Monitoring
4929Monitor your development activity to provide better context and insights.
4931This command will:
4932- Start file system monitoring for code changes
4933- Track application focus (IDE, browser, terminal)
4934- Monitor documentation site visits
4935- Build activity profiles for better context
4937Monitoring includes:
4938- File modifications in your project directories
4939- IDE and editor activity patterns
4940- Browser navigation to documentation sites
4941- Application focus and context switching
4943Use this to automatically capture your development context for better session insights."""
4946@mcp.prompt("stop-app-monitoring")
4947async def get_stop_app_monitoring_prompt() -> str:
4948 """Stop all application monitoring."""
4949 return """# Stop Application Monitoring
4951Stop monitoring your development activity.
4953This command will:
4954- Stop file system monitoring
4955- Stop application focus tracking
4956- Preserve collected activity data
4957- Clean up monitoring resources
4959Use this when you want to pause monitoring or when you're done with a development session."""
4962@mcp.prompt("activity-summary")
4963async def get_activity_summary_prompt() -> str:
4964 """Get activity summary for recent development work."""
4965 return """# Activity Summary
4967Get a comprehensive summary of your recent development activity.
4969This command will:
4970- Show file modification patterns
4971- List most active applications
4972- Display visited documentation sites
4973- Calculate productivity metrics
4975Summary includes:
4976- Event counts by type and application
4977- Most actively edited files
4978- Documentation resources consulted
4979- Average relevance scores
4981Use this to understand your development patterns and identify productive sessions."""
4984@mcp.prompt("context-insights")
4985async def get_context_insights_prompt() -> str:
4986 """Get contextual insights from recent activity."""
4987 return """# Context Insights
4989Analyze recent development activity for contextual insights.
4991This command will:
4992- Identify primary focus areas
4993- Detect technologies being used
4994- Count context switches
4995- Calculate productivity scores
4997Insights include:
4998- Primary application focus
4999- Active programming languages
5000- Documentation topics explored
5001- Project switching patterns
5002- Overall productivity assessment
5004Use this to understand your current development context and optimize your workflow."""
5007@mcp.prompt("active-files")
5008async def get_active_files_prompt() -> str:
5009 """Get files currently being worked on."""
5010 return """# Active Files
5012Show files that are currently being actively worked on.
5014This command will:
5015- List recently modified files
5016- Show activity scores and patterns
5017- Highlight most frequently changed files
5018- Include project context
5020File activity is scored based on:
5021- Frequency of modifications
5022- Recency of changes
5023- File type and relevance
5024- Project context
5026Use this to quickly see what you're currently working on and resume interrupted tasks."""
5029# Global app monitor instance
5030_app_monitor = None
5033async def get_app_monitor() -> ApplicationMonitor | None:
5034 """Get or initialize application monitor."""
5035 global _app_monitor
5036 if not APP_MONITOR_AVAILABLE:
5037 return None
5039 if _app_monitor is None:
5040 data_dir = Path.home() / ".claude" / "data" / "app_monitoring"
5041 working_dir = os.environ.get("PWD", os.getcwd())
5042 project_paths = [working_dir] if Path(working_dir).exists() else []
5043 _app_monitor = ApplicationMonitor(str(data_dir), project_paths)
5045 return _app_monitor
5048@mcp.tool()
5049async def start_app_monitoring(project_paths: list[str] | None = None) -> str:
5050 """Start monitoring IDE activity and browser documentation usage.
5052 Args:
5053 project_paths: Optional list of project directories to monitor
5055 """
5056 if not APP_MONITOR_AVAILABLE:
5057 return "❌ Application monitoring not available. Install dependencies: pip install watchdog psutil"
5059 try:
5060 monitor = await get_app_monitor()
5061 if not monitor:
5062 return "❌ Failed to initialize application monitor"
5064 # Update project paths if provided
5065 if project_paths:
5066 monitor.project_paths = project_paths
5067 monitor.ide_monitor.project_paths = project_paths
5069 result = await monitor.start_monitoring()
5071 status_lines = [
5072 "🎯 Application monitoring started",
5073 f"📁 Monitoring {len(monitor.project_paths)} project paths",
5074 f"📝 IDE monitoring: {'✅' if result.get('ide_monitoring') else '❌'}",
5075 f"🔍 Watchdog available: {'✅' if result.get('watchdog_available') else '❌'}",
5076 f"📊 Process monitoring: {'✅' if result.get('psutil_available') else '❌'}",
5077 ]
5079 return "\n".join(status_lines)
5081 except Exception as e:
5082 return f"❌ Error starting monitoring: {e}"
5085@mcp.tool()
5086async def stop_app_monitoring() -> str:
5087 """Stop all application monitoring."""
5088 if not APP_MONITOR_AVAILABLE:
5089 return "❌ Application monitoring not available"
5091 try:
5092 monitor = await get_app_monitor()
5093 if monitor and monitor.monitoring_active:
5094 await monitor.stop_monitoring()
5095 return "✅ Application monitoring stopped"
5096 return "ℹ️ Application monitoring was not active"
5097 except Exception as e:
5098 return f"❌ Error stopping monitoring: {e}"
5101@mcp.tool()
5102async def get_activity_summary(hours: int = 2) -> str:
5103 """Get activity summary for the specified number of hours.
5105 Args:
5106 hours: Number of hours to look back (default: 2)
5108 """
5109 if not APP_MONITOR_AVAILABLE:
5110 return "❌ Application monitoring not available"
5112 try:
5113 monitor = await get_app_monitor()
5114 if not monitor:
5115 return "❌ Application monitor not initialized"
5117 summary = monitor.get_activity_summary(hours)
5119 output = [
5120 f"📊 Activity Summary (Last {hours} hours)",
5121 f"🎯 Total events: {summary['total_events']}",
5122 f"📱 Average relevance: {summary['average_relevance']:.2f}",
5123 "",
5124 ]
5126 if summary["event_types"]:
5127 output.append("📋 Event Types:")
5128 for event_type, count in summary["event_types"].items():
5129 output.append(f" • {event_type}: {count}")
5130 output.append("")
5132 if summary["applications"]:
5133 output.append("💻 Applications:")
5134 for app, count in summary["applications"].items():
5135 output.append(f" • {app}: {count}")
5136 output.append("")
5138 if summary["active_files"]:
5139 output.append("📄 Most Active Files:")
5140 for file_info in summary["active_files"][:5]:
5141 output.append(
5142 f" • {file_info['file_path']} (score: {file_info['activity_score']})",
5143 )
5144 output.append("")
5146 if summary["documentation_sites"]:
5147 output.append("📖 Documentation Sites Visited:")
5148 for site in summary["documentation_sites"]:
5149 output.append(f" • {site}")
5151 return "\n".join(output)
5153 except Exception as e:
5154 return f"❌ Error getting activity summary: {e}"
5157@mcp.tool()
5158async def get_context_insights(hours: int = 1) -> str:
5159 """Get contextual insights from recent activity.
5161 Args:
5162 hours: Number of hours to analyze (default: 1)
5164 """
5165 if not APP_MONITOR_AVAILABLE:
5166 return "❌ Application monitoring not available"
5168 try:
5169 monitor = await get_app_monitor()
5170 if not monitor:
5171 return "❌ Application monitor not initialized"
5173 insights = monitor.get_context_insights(hours)
5175 output = [f"🧠 Context Insights (Last {hours} hours)", ""]
5177 if insights["primary_focus"]:
5178 output.append(f"🎯 Primary Focus: {insights['primary_focus']}")
5180 if insights["technologies_used"]:
5181 tech_list = ", ".join(insights["technologies_used"])
5182 output.append(f"💻 Technologies: {tech_list}")
5184 if insights["active_projects"]:
5185 output.append(f"📁 Active Projects: {len(insights['active_projects'])}")
5186 for project in list(insights["active_projects"])[:3]:
5187 output.append(f" • {Path(project).name}")
5189 if insights["documentation_topics"]:
5190 output.append("📖 Documentation Topics:")
5191 for topic in insights["documentation_topics"][:5]:
5192 output.append(f" • {topic}")
5194 output.extend(
5195 [
5196 "",
5197 f"🔄 Context Switches: {insights['context_switches']}",
5198 f"⚡ Productivity Score: {insights['productivity_score']:.2f}",
5199 ],
5200 )
5202 return "\n".join(output)
5204 except Exception as e:
5205 return f"❌ Error getting context insights: {e}"
5208@mcp.tool()
5209async def get_active_files(minutes: int = 60) -> str:
5210 """Get files currently being worked on.
5212 Args:
5213 minutes: Number of minutes to look back (default: 60)
5215 """
5216 if not APP_MONITOR_AVAILABLE:
5217 return "❌ Application monitoring not available"
5219 try:
5220 monitor = await get_app_monitor()
5221 if not monitor:
5222 return "❌ Application monitor not initialized"
5224 active_files = monitor.ide_monitor.get_active_files(minutes)
5226 if not active_files:
5227 return f"📄 No active files found in the last {minutes} minutes"
5229 output = [f"📄 Active Files (Last {minutes} minutes)", ""]
5231 for file_info in active_files[:10]:
5232 file_path = file_info["file_path"]
5233 relative_path = Path(file_path).name if len(file_path) > 50 else file_path
5234 score = file_info["activity_score"]
5235 count = file_info["event_count"]
5236 output.append(f"• {relative_path} (score: {score}, events: {count})")
5238 return "\n".join(output)
5240 except Exception as e:
5241 return f"❌ Error getting active files: {e}"
5244# Global LLM manager instance
5245_llm_manager = None
5248async def get_llm_manager() -> LLMManager | None:
5249 """Get or initialize LLM manager."""
5250 global _llm_manager
5251 if not LLM_PROVIDERS_AVAILABLE:
5252 return None
5254 if _llm_manager is None:
5255 config_path = Path.home() / ".claude" / "data" / "llm_config.json"
5256 _llm_manager = LLMManager(str(config_path) if config_path.exists() else None)
5258 return _llm_manager
5261@mcp.tool()
5262async def list_llm_providers() -> str:
5263 """List all available LLM providers and their models."""
5264 if not LLM_PROVIDERS_AVAILABLE:
5265 return "❌ LLM providers not available. Install dependencies: pip install openai google-generativeai aiohttp"
5267 try:
5268 manager = await get_llm_manager()
5269 if not manager:
5270 return "❌ Failed to initialize LLM manager"
5272 available_providers = await manager.get_available_providers()
5273 provider_info = manager.get_provider_info()
5275 output = ["🤖 Available LLM Providers", ""]
5277 for provider_name, info in provider_info["providers"].items():
5278 status = "✅" if provider_name in available_providers else "❌"
5279 output.append(f"{status} {provider_name.title()}")
5281 if provider_name in available_providers:
5282 models = info["models"][:5] # Show first 5 models
5283 for model in models:
5284 output.append(f" • {model}")
5285 if len(info["models"]) > 5:
5286 output.append(f" • ... and {len(info['models']) - 5} more")
5287 output.append("")
5289 config = provider_info["config"]
5290 output.extend(
5291 [
5292 f"🎯 Default Provider: {config['default_provider']}",
5293 f"🔄 Fallback Providers: {', '.join(config['fallback_providers'])}",
5294 ],
5295 )
5297 return "\n".join(output)
5299 except Exception as e:
5300 return f"❌ Error listing providers: {e}"
5303@mcp.tool()
5304async def test_llm_providers() -> str:
5305 """Test all LLM providers to check their availability and functionality."""
5306 if not LLM_PROVIDERS_AVAILABLE:
5307 return "❌ LLM providers not available"
5309 try:
5310 manager = await get_llm_manager()
5311 if not manager:
5312 return "❌ Failed to initialize LLM manager"
5314 test_results = await manager.test_providers()
5316 output = ["🧪 LLM Provider Test Results", ""]
5318 for provider_name, result in test_results.items():
5319 if result["available"] and result["test_successful"]:
5320 output.append(f"✅ {provider_name.title()}: Working")
5321 output.append(f" Model: {result.get('model', 'Unknown')}")
5322 output.append(f" Response: {result.get('response_length', 0)} chars")
5323 elif result["available"]:
5324 output.append(f"⚠️ {provider_name.title()}: Available but test failed")
5325 output.append(f" Error: {result.get('error', 'Unknown error')}")
5326 else:
5327 output.append(f"❌ {provider_name.title()}: Not available")
5328 output.append(f" Error: {result.get('error', 'Unknown error')}")
5329 output.append("")
5331 return "\n".join(output)
5333 except Exception as e:
5334 return f"❌ Error testing providers: {e}"
5337@mcp.tool()
5338async def generate_with_llm(
5339 prompt: str,
5340 provider: str | None = None,
5341 model: str | None = None,
5342 temperature: float = 0.7,
5343 max_tokens: int | None = None,
5344 use_fallback: bool = True,
5345) -> str:
5346 """Generate text using specified LLM provider.
5348 Args:
5349 prompt: The text prompt to generate from
5350 provider: LLM provider to use (openai, gemini, ollama)
5351 model: Specific model to use
5352 temperature: Generation temperature (0.0-1.0)
5353 max_tokens: Maximum tokens to generate
5354 use_fallback: Whether to use fallback providers if primary fails
5356 """
5357 if not LLM_PROVIDERS_AVAILABLE:
5358 return "❌ LLM providers not available"
5360 try:
5361 manager = await get_llm_manager()
5362 if not manager:
5363 return "❌ Failed to initialize LLM manager"
5365 messages = [LLMMessage(role="user", content=prompt)]
5367 response = await manager.generate(
5368 messages=messages,
5369 provider=provider,
5370 model=model,
5371 temperature=temperature,
5372 max_tokens=max_tokens,
5373 use_fallback=use_fallback,
5374 )
5376 output = [
5377 f"🤖 Generated by {response.provider} ({response.model})",
5378 "",
5379 response.content,
5380 "",
5381 f"📊 Usage: {response.usage.get('total_tokens', 0)} tokens",
5382 f"⏱️ Generated: {response.timestamp}",
5383 ]
5385 return "\n".join(output)
5387 except Exception as e:
5388 return f"❌ Error generating text: {e}"
5391@mcp.tool()
5392async def chat_with_llm(
5393 messages: list[dict[str, str]],
5394 provider: str | None = None,
5395 model: str | None = None,
5396 temperature: float = 0.7,
5397 max_tokens: int | None = None,
5398) -> str:
5399 """Have a conversation with an LLM provider.
5401 Args:
5402 messages: List of messages in format [{"role": "user/assistant/system", "content": "text"}]
5403 provider: LLM provider to use (openai, gemini, ollama)
5404 model: Specific model to use
5405 temperature: Generation temperature (0.0-1.0)
5406 max_tokens: Maximum tokens to generate
5408 """
5409 if not LLM_PROVIDERS_AVAILABLE:
5410 return "❌ LLM providers not available"
5412 try:
5413 manager = await get_llm_manager()
5414 if not manager:
5415 return "❌ Failed to initialize LLM manager"
5417 # Convert to LLMMessage objects
5418 llm_messages = []
5419 for msg in messages:
5420 if not isinstance(msg, dict) or "role" not in msg or "content" not in msg:
5421 return "❌ Invalid message format. Each message must have 'role' and 'content' fields"
5423 if msg["role"] not in ["user", "assistant", "system"]:
5424 return "❌ Invalid role. Must be 'user', 'assistant', or 'system'"
5426 llm_messages.append(LLMMessage(role=msg["role"], content=msg["content"]))
5428 response = await manager.generate(
5429 messages=llm_messages,
5430 provider=provider,
5431 model=model,
5432 temperature=temperature,
5433 max_tokens=max_tokens,
5434 )
5436 output = [
5437 f"🤖 {response.provider} ({response.model}) response:",
5438 "",
5439 response.content,
5440 "",
5441 f"📊 Usage: {response.usage.get('total_tokens', 0)} tokens",
5442 f"⏱️ Generated: {response.timestamp}",
5443 ]
5445 return "\n".join(output)
5447 except Exception as e:
5448 return f"❌ Error in chat: {e}"
5451@mcp.tool()
5452async def configure_llm_provider(
5453 provider: str,
5454 api_key: str | None = None,
5455 base_url: str | None = None,
5456 default_model: str | None = None,
5457) -> str:
5458 """Configure an LLM provider with API credentials and settings.
5460 Args:
5461 provider: Provider name (openai, gemini, ollama)
5462 api_key: API key for the provider
5463 base_url: Base URL for the provider API
5464 default_model: Default model to use
5466 """
5467 if not LLM_PROVIDERS_AVAILABLE:
5468 return "❌ LLM providers not available"
5470 try:
5471 config_path = Path.home() / ".claude" / "data" / "llm_config.json"
5472 config_path.parent.mkdir(parents=True, exist_ok=True)
5474 # Load existing config
5475 config = {}
5476 if config_path.exists():
5477 with open(config_path) as f:
5478 config = json.load(f)
5480 # Initialize structure
5481 if "providers" not in config:
5482 config["providers"] = {}
5483 if provider not in config["providers"]:
5484 config["providers"][provider] = {}
5486 # Update configuration
5487 if api_key:
5488 config["providers"][provider]["api_key"] = api_key
5489 if base_url:
5490 config["providers"][provider]["base_url"] = base_url
5491 if default_model:
5492 config["providers"][provider]["default_model"] = default_model
5494 # Save configuration
5495 with open(config_path, "w") as f:
5496 json.dump(config, f, indent=2)
5498 # Reinitialize manager to pick up new config
5499 global _llm_manager
5500 _llm_manager = None
5502 return f"✅ Configuration updated for {provider} provider"
5504 except Exception as e:
5505 return f"❌ Error configuring provider: {e}"
5508# Global serverless session manager
5509_serverless_manager = None
5512async def get_serverless_manager() -> ServerlessSessionManager | None:
5513 """Get or initialize serverless session manager."""
5514 global _serverless_manager
5515 if not SERVERLESS_MODE_AVAILABLE:
5516 return None
5518 if _serverless_manager is None:
5519 config_path = Path.home() / ".claude" / "data" / "serverless_config.json"
5520 config = ServerlessConfigManager.load_config(
5521 str(config_path) if config_path.exists() else None,
5522 )
5523 storage_backend = ServerlessConfigManager.create_storage_backend(config)
5524 _serverless_manager = ServerlessSessionManager(storage_backend)
5526 return _serverless_manager
5529@mcp.tool()
5530async def create_serverless_session(
5531 user_id: str,
5532 project_id: str,
5533 session_data: dict[str, Any] | None = None,
5534 ttl_hours: int = 24,
5535) -> str:
5536 """Create a new serverless session with external storage.
5538 Args:
5539 user_id: User identifier for the session
5540 project_id: Project identifier for the session
5541 session_data: Optional metadata for the session
5542 ttl_hours: Time-to-live in hours (default: 24)
5544 """
5545 if not SERVERLESS_MODE_AVAILABLE:
5546 return "❌ Serverless mode not available. Install dependencies: pip install redis boto3"
5548 try:
5549 manager = await get_serverless_manager()
5550 if not manager:
5551 return "❌ Failed to initialize serverless manager"
5553 session_id = await manager.create_session(
5554 user_id=user_id,
5555 project_id=project_id,
5556 session_data=session_data,
5557 ttl_hours=ttl_hours,
5558 )
5560 return f"✅ Created serverless session: {session_id}\n🕐 TTL: {ttl_hours} hours"
5562 except Exception as e:
5563 return f"❌ Error creating session: {e}"
5566@mcp.tool()
5567async def get_serverless_session(session_id: str) -> str:
5568 """Get serverless session state.
5570 Args:
5571 session_id: Session identifier to retrieve
5573 """
5574 if not SERVERLESS_MODE_AVAILABLE:
5575 return "❌ Serverless mode not available"
5577 try:
5578 manager = await get_serverless_manager()
5579 if not manager:
5580 return "❌ Failed to initialize serverless manager"
5582 session_state = await manager.get_session(session_id)
5583 if not session_state:
5584 return f"❌ Session not found: {session_id}"
5586 output = [
5587 f"📊 Session: {session_id}",
5588 f"👤 User: {session_state.user_id}",
5589 f"📁 Project: {session_state.project_id}",
5590 f"🕐 Created: {session_state.created_at}",
5591 f"⏰ Last Activity: {session_state.last_activity}",
5592 f"🔐 Permissions: {len(session_state.permissions)}",
5593 f"💬 Conversations: {len(session_state.conversation_history)}",
5594 f"💾 Size: {session_state.get_compressed_size()} bytes (compressed)",
5595 ]
5597 if session_state.metadata:
5598 output.append(
5599 f"📋 Metadata: {json.dumps(session_state.metadata, indent=2)}",
5600 )
5602 return "\n".join(output)
5604 except Exception as e:
5605 return f"❌ Error getting session: {e}"
5608@mcp.tool()
5609async def update_serverless_session(
5610 session_id: str,
5611 updates: dict[str, Any],
5612 ttl_hours: int | None = None,
5613) -> str:
5614 """Update serverless session state.
5616 Args:
5617 session_id: Session identifier to update
5618 updates: Dictionary of updates to apply
5619 ttl_hours: Optional new TTL in hours
5621 """
5622 if not SERVERLESS_MODE_AVAILABLE:
5623 return "❌ Serverless mode not available"
5625 try:
5626 manager = await get_serverless_manager()
5627 if not manager:
5628 return "❌ Failed to initialize serverless manager"
5630 success = await manager.update_session(session_id, updates, ttl_hours)
5632 if success:
5633 update_summary = ", ".join(
5634 f"{k}: {type(v).__name__}" for k, v in updates.items()
5635 )
5636 result = f"✅ Updated session {session_id}\n📝 Changes: {update_summary}"
5637 if ttl_hours:
5638 result += f"\n🕐 New TTL: {ttl_hours} hours"
5639 return result
5640 return f"❌ Failed to update session {session_id}"
5642 except Exception as e:
5643 return f"❌ Error updating session: {e}"
5646@mcp.tool()
5647async def delete_serverless_session(session_id: str) -> str:
5648 """Delete a serverless session.
5650 Args:
5651 session_id: Session identifier to delete
5653 """
5654 if not SERVERLESS_MODE_AVAILABLE:
5655 return "❌ Serverless mode not available"
5657 try:
5658 manager = await get_serverless_manager()
5659 if not manager:
5660 return "❌ Failed to initialize serverless manager"
5662 success = await manager.delete_session(session_id)
5664 if success:
5665 return f"✅ Deleted session {session_id}"
5666 return f"❌ Session not found or failed to delete: {session_id}"
5668 except Exception as e:
5669 return f"❌ Error deleting session: {e}"
5672@mcp.tool()
5673async def list_serverless_sessions(
5674 user_id: str | None = None,
5675 project_id: str | None = None,
5676) -> str:
5677 """List serverless sessions by user or project.
5679 Args:
5680 user_id: Filter by user ID (optional)
5681 project_id: Filter by project ID (optional)
5683 """
5684 if not SERVERLESS_MODE_AVAILABLE:
5685 return "❌ Serverless mode not available"
5687 try:
5688 manager = await get_serverless_manager()
5689 if not manager:
5690 return "❌ Failed to initialize serverless manager"
5692 if user_id:
5693 session_ids = await manager.list_user_sessions(user_id)
5694 filter_desc = f"user '{user_id}'"
5695 elif project_id:
5696 session_ids = await manager.list_project_sessions(project_id)
5697 filter_desc = f"project '{project_id}'"
5698 else:
5699 # List all sessions (expensive operation)
5700 session_ids = await manager.storage.list_sessions()
5701 filter_desc = "all users/projects"
5703 if not session_ids:
5704 return f"📭 No sessions found for {filter_desc}"
5706 output = [f"📋 Sessions for {filter_desc} ({len(session_ids)} found)", ""]
5708 # Get details for first few sessions
5709 for session_id in session_ids[:10]:
5710 session_state = await manager.get_session(session_id)
5711 if session_state:
5712 output.append(
5713 f"• {session_id[:12]}... (user: {session_state.user_id}, project: {session_state.project_id})",
5714 )
5716 if len(session_ids) > 10:
5717 output.append(f"... and {len(session_ids) - 10} more")
5719 return "\n".join(output)
5721 except Exception as e:
5722 return f"❌ Error listing sessions: {e}"
5725@mcp.tool()
5726async def test_serverless_storage() -> str:
5727 """Test serverless storage backends for availability."""
5728 if not SERVERLESS_MODE_AVAILABLE:
5729 return "❌ Serverless mode not available"
5731 try:
5732 config_path = Path.home() / ".claude" / "data" / "serverless_config.json"
5733 config = ServerlessConfigManager.load_config(
5734 str(config_path) if config_path.exists() else None,
5735 )
5737 test_results = await ServerlessConfigManager.test_storage_backends(config)
5739 output = ["🧪 Storage Backend Test Results", ""]
5741 for backend_name, available in test_results.items():
5742 status = "✅" if available else "❌"
5743 output.append(
5744 f"{status} {backend_name.title()}: {'Available' if available else 'Not available'}",
5745 )
5747 # Show current configuration
5748 current_backend = config.get("storage_backend", "local")
5749 current_available = test_results.get(current_backend, False)
5751 output.extend(
5752 [
5753 "",
5754 f"🎯 Current Backend: {current_backend} ({'✅ Available' if current_available else '❌ Not available'})",
5755 ],
5756 )
5758 return "\n".join(output)
5760 except Exception as e:
5761 return f"❌ Error testing storage: {e}"
5764@mcp.tool()
5765async def cleanup_serverless_sessions() -> str:
5766 """Clean up expired serverless sessions."""
5767 if not SERVERLESS_MODE_AVAILABLE:
5768 return "❌ Serverless mode not available"
5770 try:
5771 manager = await get_serverless_manager()
5772 if not manager:
5773 return "❌ Failed to initialize serverless manager"
5775 cleaned_count = await manager.cleanup_sessions()
5776 stats = manager.get_session_stats()
5778 output = [
5779 "🧹 Session Cleanup Results",
5780 f"🗑️ Cleaned up: {cleaned_count} expired sessions",
5781 f"💾 Active sessions in cache: {stats['cached_sessions']}",
5782 f"🏗️ Storage backend: {stats['storage_backend']}",
5783 ]
5785 return "\n".join(output)
5787 except Exception as e:
5788 return f"❌ Error during cleanup: {e}"
5791@mcp.tool()
5792async def configure_serverless_storage(
5793 backend: str,
5794 config_updates: dict[str, Any],
5795) -> str:
5796 """Configure serverless storage backend settings.
5798 Args:
5799 backend: Storage backend (redis, s3, local)
5800 config_updates: Configuration updates to apply
5802 """
5803 if not SERVERLESS_MODE_AVAILABLE:
5804 return "❌ Serverless mode not available"
5806 try:
5807 config_path = Path.home() / ".claude" / "data" / "serverless_config.json"
5808 config_path.parent.mkdir(parents=True, exist_ok=True)
5810 # Load existing config
5811 config = ServerlessConfigManager.load_config(
5812 str(config_path) if config_path.exists() else None,
5813 )
5815 # Update configuration
5816 if "backends" not in config:
5817 config["backends"] = {}
5818 if backend not in config["backends"]:
5819 config["backends"][backend] = {}
5821 config["backends"][backend].update(config_updates)
5822 config["storage_backend"] = backend
5824 # Save configuration
5825 with open(config_path, "w") as f:
5826 json.dump(config, f, indent=2)
5828 # Reinitialize manager
5829 global _serverless_manager
5830 _serverless_manager = None
5832 return (
5833 f"✅ Configured {backend} storage backend\n📁 Config saved to {config_path}"
5834 )
5836 except Exception as e:
5837 return f"❌ Error configuring storage: {e}"
5840# Team Knowledge Base Tools
5841@mcp.tool()
5842async def create_team_user(
5843 user_id: str,
5844 username: str,
5845 email: str | None = None,
5846 role: str = "contributor",
5847) -> str:
5848 """Create a new team user with specified role."""
5849 try:
5850 from .team_knowledge import create_team_user as _create_team_user
5852 user_data = await _create_team_user(user_id, username, email, role)
5854 output = []
5855 output.append("👤 Team user created successfully!")
5856 output.append(f"🆔 User ID: {user_data['user_id']}")
5857 output.append(f"👥 Username: {user_data['username']}")
5858 output.append(f"🏷️ Role: {user_data['role']}")
5859 if email:
5860 output.append(f"📧 Email: {email}")
5861 output.append(f"📅 Created: {user_data['created_at']}")
5862 output.append("🎉 User can now participate in team knowledge sharing")
5864 return "\n".join(output)
5866 except ImportError:
5867 return "❌ Team knowledge tools not available"
5868 except Exception as e:
5869 return f"❌ Error creating team user: {e}"
5872@mcp.tool()
5873async def create_team(team_id: str, name: str, description: str, owner_id: str) -> str:
5874 """Create a new team for knowledge sharing."""
5875 try:
5876 from .team_knowledge import create_team as _create_team
5878 team_data = await _create_team(team_id, name, description, owner_id)
5880 output = []
5881 output.append("🏆 Team created successfully!")
5882 output.append(f"🆔 Team ID: {team_data['team_id']}")
5883 output.append(f"📛 Name: {team_data['name']}")
5884 output.append(f"📝 Description: {team_data['description']}")
5885 output.append(f"👑 Owner: {team_data['owner_id']}")
5886 output.append(f"👥 Members: {team_data['member_count']}")
5887 output.append(f"📁 Projects: {team_data['project_count']}")
5888 output.append(f"📅 Created: {team_data['created_at']}")
5889 output.append("🎯 Team is ready for collaborative knowledge sharing")
5891 return "\n".join(output)
5893 except ImportError:
5894 return "❌ Team knowledge tools not available"
5895 except Exception as e:
5896 return f"❌ Error creating team: {e}"
5899@mcp.tool()
5900async def add_team_reflection(
5901 content: str,
5902 author_id: str,
5903 tags: list[str] | None = None,
5904 access_level: str = "team",
5905 team_id: str | None = None,
5906 project_id: str | None = None,
5907) -> str:
5908 """Add reflection to team knowledge base with access control."""
5909 try:
5910 from .team_knowledge import add_team_reflection as _add_team_reflection
5912 reflection_id = await _add_team_reflection(
5913 content,
5914 author_id,
5915 tags,
5916 access_level,
5917 team_id,
5918 project_id,
5919 )
5921 output = []
5922 output.append("💡 Team reflection added successfully!")
5923 output.append(f"🆔 Reflection ID: {reflection_id}")
5924 output.append(f"👤 Author: {author_id}")
5925 output.append(f"🔒 Access Level: {access_level}")
5926 if team_id:
5927 output.append(f"👥 Team: {team_id}")
5928 if project_id:
5929 output.append(f"📁 Project: {project_id}")
5930 if tags:
5931 output.append(f"🏷️ Tags: {', '.join(tags)}")
5932 output.append(f"📝 Content: {content[:100]}...")
5933 output.append("🌟 Reflection is now available for team collaboration")
5935 return "\n".join(output)
5937 except ImportError:
5938 return "❌ Team knowledge tools not available"
5939 except Exception as e:
5940 return f"❌ Error adding team reflection: {e}"
5943@mcp.tool()
5944async def search_team_knowledge(
5945 query: str,
5946 user_id: str,
5947 team_id: str | None = None,
5948 project_id: str | None = None,
5949 tags: list[str] | None = None,
5950 limit: int = 20,
5951) -> str:
5952 """Search team reflections with access control."""
5953 try:
5954 from .team_knowledge import search_team_knowledge as _search_team_knowledge
5956 results = await _search_team_knowledge(
5957 query,
5958 user_id,
5959 team_id,
5960 project_id,
5961 tags,
5962 limit,
5963 )
5965 if not results:
5966 return f"🔍 No team knowledge found for query: '{query}'\n💡 Try adjusting search terms or access permissions."
5968 output = []
5969 output.append(f"🧠 Found {len(results)} team reflections for: '{query}'")
5970 output.append(f"👤 User: {user_id}")
5971 if team_id:
5972 output.append(f"👥 Team: {team_id}")
5973 if project_id:
5974 output.append(f"📁 Project: {project_id}")
5975 if tags:
5976 output.append(f"🏷️ Tags: {', '.join(tags)}")
5977 output.append("=" * 50)
5979 for i, result in enumerate(results, 1):
5980 output.append(f"\n#{i}")
5981 output.append(f"🆔 ID: {result['id']}")
5982 output.append(f"👤 Author: {result['author_id']}")
5983 output.append(f"🔒 Access: {result['access_level']}")
5984 output.append(f"👍 Votes: {result['votes']}")
5985 if result.get("team_id"):
5986 output.append(f"👥 Team: {result['team_id']}")
5987 if result.get("tags"):
5988 output.append(f"🏷️ Tags: {', '.join(result['tags'])}")
5989 output.append(f"📝 {result['content'][:200]}...")
5990 output.append(f"📅 {result['created_at']}")
5992 return "\n".join(output)
5994 except ImportError:
5995 return "❌ Team knowledge tools not available"
5996 except Exception as e:
5997 return f"❌ Error searching team knowledge: {e}"
6000@mcp.tool()
6001async def join_team(user_id: str, team_id: str, requester_id: str | None = None) -> str:
6002 """Join a team or add user to team."""
6003 try:
6004 from .team_knowledge import join_team as _join_team
6006 success = await _join_team(user_id, team_id, requester_id)
6008 if success:
6009 output = []
6010 output.append("🎉 Successfully joined team!")
6011 output.append(f"👤 User: {user_id}")
6012 output.append(f"👥 Team: {team_id}")
6013 if requester_id and requester_id != user_id:
6014 output.append(f"👑 Added by: {requester_id}")
6015 output.append(
6016 "🌟 You can now access team reflections and contribute knowledge",
6017 )
6018 return "\n".join(output)
6019 return (
6020 f"❌ Failed to join team {team_id}. Check permissions and team existence."
6021 )
6023 except ImportError:
6024 return "❌ Team knowledge tools not available"
6025 except Exception as e:
6026 return f"❌ Error joining team: {e}"
6029@mcp.tool()
6030async def get_team_statistics(team_id: str, user_id: str) -> str:
6031 """Get team statistics and activity."""
6032 try:
6033 from .team_knowledge import get_team_statistics as _get_team_statistics
6035 stats = await _get_team_statistics(team_id, user_id)
6037 if not stats:
6038 return f"❌ Cannot access team {team_id}. Check permissions and team existence."
6040 output = []
6041 team_info = stats["team"]
6042 reflection_stats = stats["reflection_stats"]
6044 output.append(f"📊 Team Statistics: {team_info['name']}")
6045 output.append(f"🆔 Team ID: {team_info['team_id']}")
6046 output.append(f"📝 Description: {team_info['description']}")
6047 output.append(f"👑 Owner: {team_info['owner_id']}")
6048 output.append(f"📅 Created: {team_info['created_at']}")
6049 output.append("=" * 50)
6051 output.append(f"👥 Members: {stats['member_count']}")
6052 output.append(f"📁 Projects: {stats['project_count']}")
6053 output.append(
6054 f"💡 Total Reflections: {reflection_stats['total_reflections'] or 0}",
6055 )
6056 output.append(
6057 f"✍️ Active Contributors: {reflection_stats['active_contributors'] or 0}",
6058 )
6059 output.append(f"👍 Total Votes: {reflection_stats['total_votes'] or 0}")
6060 output.append(
6061 f"📈 Avg Votes/Reflection: {reflection_stats['avg_votes']:.1f}"
6062 if reflection_stats["avg_votes"]
6063 else "📈 Avg Votes/Reflection: 0.0",
6064 )
6065 output.append(
6066 f"⚡ Recent Activity (7 days): {stats['recent_activity']['recent_reflections']} reflections",
6067 )
6069 return "\n".join(output)
6071 except ImportError:
6072 return "❌ Team knowledge tools not available"
6073 except Exception as e:
6074 return f"❌ Error getting team statistics: {e}"
6077@mcp.tool()
6078async def get_user_team_permissions(user_id: str) -> str:
6079 """Get user's permissions and team memberships."""
6080 try:
6081 from .team_knowledge import (
6082 get_user_team_permissions as _get_user_team_permissions,
6083 )
6085 perms = await _get_user_team_permissions(user_id)
6087 if not perms:
6088 return f"❌ User {user_id} not found in team knowledge base"
6090 user_data = perms["user"]
6091 teams = perms["teams"]
6093 output = []
6094 output.append(f"👤 User Permissions: {user_data['username']}")
6095 output.append(f"🆔 User ID: {user_data['user_id']}")
6096 output.append(f"🏷️ Role: {user_data['role']}")
6097 if user_data.get("email"):
6098 output.append(f"📧 Email: {user_data['email']}")
6099 output.append(f"📅 Created: {user_data['created_at']}")
6100 output.append(f"⏰ Last Active: {user_data['last_active']}")
6101 output.append("=" * 50)
6103 output.append(f"👥 Teams ({len(teams)}):")
6104 for team in teams:
6105 output.append(f" • {team['name']} ({team['team_id']})")
6106 if team.get("description"):
6107 output.append(f" 📝 {team['description'][:80]}...")
6109 output.append("\n🔒 Permissions:")
6110 permissions = user_data.get("permissions", {})
6111 for perm, enabled in permissions.items():
6112 status = "✅" if enabled else "❌"
6113 perm_name = perm.replace("_", " ").title()
6114 output.append(f" {status} {perm_name}")
6116 output.append("\n🎯 Special Abilities:")
6117 if perms["can_create_teams"]:
6118 output.append(" ✅ Can create teams")
6119 if perms["can_moderate"]:
6120 output.append(" ✅ Can moderate content")
6122 return "\n".join(output)
6124 except ImportError:
6125 return "❌ Team knowledge tools not available"
6126 except Exception as e:
6127 return f"❌ Error getting user permissions: {e}"
6130@mcp.tool()
6131async def vote_on_reflection(
6132 reflection_id: str,
6133 user_id: str,
6134 vote_delta: int = 1,
6135) -> str:
6136 """Vote on a team reflection (upvote/downvote)."""
6137 try:
6138 from .team_knowledge import vote_on_reflection as _vote_on_reflection
6140 success = await _vote_on_reflection(reflection_id, user_id, vote_delta)
6142 if success:
6143 vote_type = "upvoted" if vote_delta > 0 else "downvoted"
6144 vote_emoji = "👍" if vote_delta > 0 else "👎"
6145 output = []
6146 output.append(f"{vote_emoji} Reflection {vote_type} successfully!")
6147 output.append(f"🆔 Reflection ID: {reflection_id}")
6148 output.append(f"👤 Voter: {user_id}")
6149 output.append(f"📊 Vote Delta: {vote_delta:+d}")
6150 output.append("🌟 Your vote helps surface valuable team knowledge")
6151 return "\n".join(output)
6152 return f"❌ Failed to vote on reflection {reflection_id}. Check permissions and reflection existence."
6154 except ImportError:
6155 return "❌ Team knowledge tools not available"
6156 except Exception as e:
6157 return f"❌ Error voting on reflection: {e}"
6160# Natural Language Scheduling Tools
6161@mcp.tool()
6162async def create_natural_reminder(
6163 title: str,
6164 time_expression: str,
6165 description: str = "",
6166 user_id: str = "default",
6167 project_id: str | None = None,
6168 notification_method: str = "session",
6169) -> str:
6170 """Create reminder from natural language time expression."""
6171 try:
6172 from .natural_scheduler import (
6173 create_natural_reminder as _create_natural_reminder,
6174 )
6176 reminder_id = await _create_natural_reminder(
6177 title,
6178 time_expression,
6179 description,
6180 user_id,
6181 project_id,
6182 notification_method,
6183 )
6185 if reminder_id:
6186 output = []
6187 output.append("⏰ Natural reminder created successfully!")
6188 output.append(f"🆔 Reminder ID: {reminder_id}")
6189 output.append(f"📝 Title: {title}")
6190 output.append(f"📄 Description: {description}")
6191 output.append(f"🕐 When: {time_expression}")
6192 output.append(f"👤 User: {user_id}")
6193 if project_id:
6194 output.append(f"📁 Project: {project_id}")
6195 output.append(f"📢 Notification: {notification_method}")
6196 output.append(
6197 "🎯 Reminder will trigger automatically at the scheduled time",
6198 )
6199 return "\n".join(output)
6200 return f"❌ Failed to parse time expression: '{time_expression}'\n💡 Try formats like 'in 30 minutes', 'tomorrow at 9am', 'every day at 5pm'"
6202 except ImportError:
6203 return "❌ Natural scheduling tools not available. Install: pip install python-dateutil schedule python-crontab"
6204 except Exception as e:
6205 return f"❌ Error creating reminder: {e}"
6208@mcp.tool()
6209async def list_user_reminders(
6210 user_id: str = "default",
6211 project_id: str | None = None,
6212) -> str:
6213 """List pending reminders for user/project."""
6214 try:
6215 from .natural_scheduler import list_user_reminders as _list_user_reminders
6217 reminders = await _list_user_reminders(user_id, project_id)
6219 if not reminders:
6220 output = []
6221 output.append("📋 No pending reminders found")
6222 output.append(f"👤 User: {user_id}")
6223 if project_id:
6224 output.append(f"📁 Project: {project_id}")
6225 output.append(
6226 "💡 Use 'create_natural_reminder' to set up time-based reminders",
6227 )
6228 return "\n".join(output)
6230 output = []
6231 output.append(f"⏰ Found {len(reminders)} pending reminders")
6232 output.append(f"👤 User: {user_id}")
6233 if project_id:
6234 output.append(f"📁 Project: {project_id}")
6235 output.append("=" * 50)
6237 for i, reminder in enumerate(reminders, 1):
6238 output.append(f"\n#{i}")
6239 output.append(f"🆔 ID: {reminder['id']}")
6240 output.append(f"📝 Title: {reminder['title']}")
6241 if reminder["description"]:
6242 output.append(f"📄 Description: {reminder['description']}")
6243 output.append(
6244 f"🔄 Type: {reminder['reminder_type'].replace('_', ' ').title()}",
6245 )
6246 output.append(f"📊 Status: {reminder['status'].replace('_', ' ').title()}")
6247 output.append(f"🕐 Scheduled: {reminder['scheduled_for']}")
6248 output.append(f"📅 Created: {reminder['created_at']}")
6249 if reminder.get("recurrence_rule"):
6250 output.append(f"🔁 Recurrence: {reminder['recurrence_rule']}")
6251 if reminder.get("context_triggers"):
6252 output.append(f"🎯 Triggers: {', '.join(reminder['context_triggers'])}")
6254 return "\n".join(output)
6256 except ImportError:
6257 return "❌ Natural scheduling tools not available"
6258 except Exception as e:
6259 return f"❌ Error listing reminders: {e}"
6262@mcp.tool()
6263async def cancel_user_reminder(reminder_id: str) -> str:
6264 """Cancel a specific reminder."""
6265 try:
6266 from .natural_scheduler import cancel_user_reminder as _cancel_user_reminder
6268 success = await _cancel_user_reminder(reminder_id)
6270 if success:
6271 output = []
6272 output.append("❌ Reminder cancelled successfully!")
6273 output.append(f"🆔 Reminder ID: {reminder_id}")
6274 output.append("🚫 The reminder will no longer trigger")
6275 output.append("💡 You can create a new reminder if needed")
6276 return "\n".join(output)
6277 return f"❌ Failed to cancel reminder {reminder_id}. Check that the ID is correct and the reminder exists."
6279 except ImportError:
6280 return "❌ Natural scheduling tools not available"
6281 except Exception as e:
6282 return f"❌ Error cancelling reminder: {e}"
6285@mcp.tool()
6286async def check_due_reminders() -> str:
6287 """Check for reminders that are due now."""
6288 try:
6289 from .natural_scheduler import check_due_reminders as _check_due_reminders
6291 due_reminders = await _check_due_reminders()
6293 if not due_reminders:
6294 return "✅ No reminders are currently due\n⏰ All scheduled reminders are in the future"
6296 output = []
6297 output.append(f"🚨 {len(due_reminders)} reminders are DUE NOW!")
6298 output.append("=" * 50)
6300 for i, reminder in enumerate(due_reminders, 1):
6301 output.append(f"\n🔥 #{i} OVERDUE")
6302 output.append(f"🆔 ID: {reminder['id']}")
6303 output.append(f"📝 Title: {reminder['title']}")
6304 if reminder["description"]:
6305 output.append(f"📄 Description: {reminder['description']}")
6306 output.append(f"🕐 Scheduled: {reminder['scheduled_for']}")
6307 output.append(f"👤 User: {reminder['user_id']}")
6308 if reminder.get("project_id"):
6309 output.append(f"📁 Project: {reminder['project_id']}")
6311 # Calculate how overdue
6312 try:
6313 from datetime import datetime
6315 scheduled = datetime.fromisoformat(
6316 reminder["scheduled_for"],
6317 )
6318 now = datetime.now()
6319 overdue = now - scheduled
6320 if overdue.total_seconds() > 0:
6321 hours = int(overdue.total_seconds() // 3600)
6322 minutes = int((overdue.total_seconds() % 3600) // 60)
6323 if hours > 0:
6324 output.append(f"⏱️ Overdue: {hours}h {minutes}m")
6325 else:
6326 output.append(f"⏱️ Overdue: {minutes}m")
6327 except Exception:
6328 output.append("⏱️ Overdue: calculation failed")
6330 output.append(
6331 "\n💡 These reminders should be processed by the background scheduler",
6332 )
6333 return "\n".join(output)
6335 except ImportError:
6336 return "❌ Natural scheduling tools not available"
6337 except Exception as e:
6338 return f"❌ Error checking due reminders: {e}"
6341@mcp.tool()
6342async def start_reminder_service() -> str:
6343 """Start the background reminder service."""
6344 try:
6345 from .natural_scheduler import (
6346 register_session_notifications,
6347 )
6348 from .natural_scheduler import (
6349 start_reminder_service as _start_reminder_service,
6350 )
6352 # Register default session notifications
6353 register_session_notifications()
6355 # Start the service
6356 _start_reminder_service()
6358 output = []
6359 output.append("🚀 Natural reminder service started!")
6360 output.append("⏰ Background scheduler is now active")
6361 output.append("🔍 Checking for due reminders every minute")
6362 output.append("📢 Session notifications are registered")
6363 output.append(
6364 "💡 Reminders will automatically trigger at their scheduled times",
6365 )
6366 output.append("🛑 Use 'stop_reminder_service' to stop the background service")
6368 return "\n".join(output)
6370 except ImportError:
6371 return "❌ Natural scheduling tools not available"
6372 except Exception as e:
6373 return f"❌ Error starting reminder service: {e}"
6376@mcp.tool()
6377async def stop_reminder_service() -> str:
6378 """Stop the background reminder service."""
6379 try:
6380 from .natural_scheduler import stop_reminder_service as _stop_reminder_service
6382 _stop_reminder_service()
6384 output = []
6385 output.append("🛑 Natural reminder service stopped")
6386 output.append("❌ Background scheduler is no longer active")
6387 output.append("⚠️ Existing reminders will not trigger automatically")
6388 output.append("🚀 Use 'start_reminder_service' to restart the service")
6389 output.append(
6390 "💡 You can still check due reminders manually with 'check_due_reminders'",
6391 )
6393 return "\n".join(output)
6395 except ImportError:
6396 return "❌ Natural scheduling tools not available"
6397 except Exception as e:
6398 return f"❌ Error stopping reminder service: {e}"
6401# Smart Interruption Management Tools
6402@mcp.tool()
6403async def start_interruption_monitoring(
6404 working_directory: str = ".",
6405 watch_files: bool = True,
6406) -> str:
6407 """Start smart interruption monitoring with context switch detection."""
6408 try:
6409 from .interruption_manager import (
6410 start_interruption_monitoring as _start_interruption_monitoring,
6411 )
6413 await _start_interruption_monitoring(working_directory, watch_files)
6415 output = []
6416 output.append("🛡️ Smart interruption monitoring started!")
6417 output.append(f"📁 Working directory: {working_directory}")
6418 output.append(f"📄 File watching: {'enabled' if watch_files else 'disabled'}")
6419 output.append("👁️ Focus tracking: active")
6420 output.append("💾 Auto-save: enabled (30s threshold)")
6421 output.append("🔄 Context preservation: automatic")
6422 output.append(
6423 "💡 Your work context will be automatically preserved during interruptions",
6424 )
6425 output.append("🛑 Use 'stop_interruption_monitoring' to disable")
6427 return "\n".join(output)
6429 except ImportError:
6430 return "❌ Interruption management tools not available. Install: pip install psutil watchdog"
6431 except Exception as e:
6432 return f"❌ Error starting interruption monitoring: {e}"
6435@mcp.tool()
6436async def stop_interruption_monitoring() -> str:
6437 """Stop interruption monitoring."""
6438 try:
6439 from .interruption_manager import (
6440 stop_interruption_monitoring as _stop_interruption_monitoring,
6441 )
6443 _stop_interruption_monitoring()
6445 output = []
6446 output.append("🛑 Interruption monitoring stopped")
6447 output.append("❌ Focus tracking: disabled")
6448 output.append("❌ File watching: disabled")
6449 output.append("❌ Auto-save: disabled")
6450 output.append("⚠️ Context preservation is now manual only")
6451 output.append("🚀 Use 'start_interruption_monitoring' to restart")
6453 return "\n".join(output)
6455 except ImportError:
6456 return "❌ Interruption management tools not available"
6457 except Exception as e:
6458 return f"❌ Error stopping interruption monitoring: {e}"
6461@mcp.tool()
6462async def create_session_context(
6463 user_id: str,
6464 project_id: str | None = None,
6465 working_directory: str = ".",
6466) -> str:
6467 """Create new session context for interruption management."""
6468 try:
6469 from .interruption_manager import (
6470 create_session_context as _create_session_context,
6471 )
6473 session_id = await _create_session_context(
6474 user_id,
6475 project_id,
6476 working_directory,
6477 )
6479 output = []
6480 output.append("🎯 Session context created successfully!")
6481 output.append(f"🆔 Session ID: {session_id}")
6482 output.append(f"👤 User: {user_id}")
6483 if project_id:
6484 output.append(f"📁 Project: {project_id}")
6485 output.append(f"📂 Working directory: {working_directory}")
6486 output.append("💾 Context will be automatically preserved on interruptions")
6487 output.append("🔄 Session state is now being tracked")
6489 return "\n".join(output)
6491 except ImportError:
6492 return "❌ Interruption management tools not available"
6493 except Exception as e:
6494 return f"❌ Error creating session context: {e}"
6497@mcp.tool()
6498async def preserve_current_context(
6499 session_id: str | None = None,
6500 force: bool = False,
6501) -> str:
6502 """Manually preserve current session context."""
6503 try:
6504 from .interruption_manager import (
6505 preserve_current_context as _preserve_current_context,
6506 )
6508 success = await _preserve_current_context(session_id, force)
6510 if success:
6511 output = []
6512 output.append("💾 Session context preserved successfully!")
6513 if session_id:
6514 output.append(f"🆔 Session ID: {session_id}")
6515 output.append(f"🔧 Preservation type: {'forced' if force else 'automatic'}")
6516 output.append("📸 Context snapshot created")
6517 output.append("🗜️ Data compressed for storage")
6518 output.append("✅ Recovery is now possible if interruption occurs")
6519 return "\n".join(output)
6520 return "❌ Failed to preserve context. Check that a session context exists."
6522 except ImportError:
6523 return "❌ Interruption management tools not available"
6524 except Exception as e:
6525 return f"❌ Error preserving context: {e}"
6528@mcp.tool()
6529async def restore_session_context(session_id: str) -> str:
6530 """Restore session context from snapshot."""
6531 try:
6532 from .interruption_manager import (
6533 restore_session_context as _restore_session_context,
6534 )
6536 context_data = await _restore_session_context(session_id)
6538 if context_data:
6539 output = []
6540 output.append("🔄 Session context restored successfully!")
6541 output.append(f"🆔 Session ID: {session_id}")
6542 output.append(f"👤 User: {context_data['user_id']}")
6543 if context_data.get("project_id"):
6544 output.append(f"📁 Project: {context_data['project_id']}")
6545 output.append(f"📂 Working directory: {context_data['working_directory']}")
6546 output.append(
6547 f"📊 Interruption count: {context_data['interruption_count']}",
6548 )
6549 output.append(f"🔄 Recovery attempts: {context_data['recovery_attempts']}")
6550 output.append(f"⏱️ Focus duration: {context_data['focus_duration']:.1f}s")
6551 if context_data.get("open_files"):
6552 output.append(f"📄 Open files: {len(context_data['open_files'])}")
6553 output.append("✅ Context is now active and being monitored")
6554 return "\n".join(output)
6555 return f"❌ Failed to restore context for session {session_id}. Check that the session exists and has preserved data."
6557 except ImportError:
6558 return "❌ Interruption management tools not available"
6559 except Exception as e:
6560 return f"❌ Error restoring context: {e}"
6563@mcp.tool()
6564async def get_interruption_history(user_id: str, hours: int = 24) -> str:
6565 """Get recent interruption history for user."""
6566 try:
6567 from .interruption_manager import (
6568 get_interruption_history as _get_interruption_history,
6569 )
6571 history = await _get_interruption_history(user_id, hours)
6573 if not history:
6574 output = []
6575 output.append("📋 No interruptions found")
6576 output.append(f"👤 User: {user_id}")
6577 output.append(f"🕐 Time range: last {hours} hours")
6578 output.append("✅ Your work sessions have been uninterrupted!")
6579 return "\n".join(output)
6581 output = []
6582 output.append(
6583 f"📊 Found {len(history)} interruptions in the last {hours} hours",
6584 )
6585 output.append(f"👤 User: {user_id}")
6586 output.append("=" * 50)
6588 # Group by type for summary
6589 by_type = {}
6590 total_duration = 0
6591 auto_saved_count = 0
6593 for event in history:
6594 event_type = event["event_type"]
6595 by_type[event_type] = by_type.get(event_type, 0) + 1
6596 if event.get("duration"):
6597 total_duration += event["duration"]
6598 if event.get("auto_saved"):
6599 auto_saved_count += 1
6601 output.append("📈 Summary:")
6602 for event_type, count in by_type.items():
6603 type_name = event_type.replace("_", " ").title()
6604 output.append(f" • {type_name}: {count}")
6606 output.append(f"💾 Auto-saved interruptions: {auto_saved_count}/{len(history)}")
6607 if total_duration > 0:
6608 output.append(f"⏱️ Total focus time: {total_duration:.1f}s")
6610 # Show recent events
6611 output.append("\n🕐 Recent interruptions:")
6612 for i, event in enumerate(history[:5], 1):
6613 event_type = event["event_type"].replace("_", " ").title()
6614 timestamp = event["timestamp"]
6615 auto_saved = "💾" if event.get("auto_saved") else "📝"
6616 duration_info = (
6617 f" ({event['duration']:.1f}s)" if event.get("duration") else ""
6618 )
6619 output.append(
6620 f" {i}. {auto_saved} {event_type}{duration_info} - {timestamp}",
6621 )
6623 if len(history) > 5:
6624 output.append(f" ... and {len(history) - 5} more")
6626 return "\n".join(output)
6628 except ImportError:
6629 return "❌ Interruption management tools not available"
6630 except Exception as e:
6631 return f"❌ Error getting interruption history: {e}"
6634@mcp.tool()
6635def _format_statistics_header(user_id: str) -> list[str]:
6636 """Format the header section for interruption statistics."""
6637 return ["📊 Interruption Management Statistics", f"👤 User: {user_id}", "=" * 50]
6640def _format_session_statistics(sessions: dict) -> list[str]:
6641 """Format session management statistics section."""
6642 if not sessions:
6643 return []
6645 output = ["🎯 Session Management:"]
6646 output.append(f" • Total sessions: {sessions.get('total_sessions', 0)}")
6647 output.append(f" • Preserved sessions: {sessions.get('preserved_sessions', 0)}")
6648 output.append(f" • Restored sessions: {sessions.get('restored_sessions', 0)}")
6650 avg_restore = sessions.get("avg_restore_count", 0)
6651 output.append(f" • Average restore count: {avg_restore:.1f}")
6653 return output
6656def _format_interruption_type_details(by_type: list) -> list[str]:
6657 """Format interruption type details section."""
6658 if not by_type:
6659 return []
6661 output = [" • By type:"]
6663 for type_stat in by_type:
6664 type_name = type_stat.get("event_type", "").replace("_", " ").title()
6665 count = type_stat.get("type_count", 0)
6666 auto_saved = type_stat.get("auto_saved_interruptions", 0)
6667 avg_duration = type_stat.get("avg_duration", 0)
6669 output.append(f" - {type_name}: {count}")
6670 if auto_saved > 0:
6671 output.append(f" (💾 {auto_saved} auto-saved)")
6672 if avg_duration:
6673 output.append(f" (⏱️ avg: {avg_duration:.1f}s)")
6675 return output
6678def _format_interruption_statistics(interruptions: dict) -> list[str]:
6679 """Format interruption statistics section."""
6680 if not interruptions:
6681 return []
6683 total_interruptions = interruptions.get("total", 0)
6684 output = ["\n⚡ Interruption Summary:"]
6685 output.append(f" • Total interruptions: {total_interruptions}")
6687 by_type = interruptions.get("by_type", [])
6688 output.extend(_format_interruption_type_details(by_type))
6690 return output
6693def _format_snapshot_statistics(snapshots: dict) -> list[str]:
6694 """Format snapshot statistics section."""
6695 if not snapshots:
6696 return []
6698 total_snapshots = snapshots.get("total_snapshots", 0)
6699 total_size = snapshots.get("total_size", 0)
6700 avg_size = snapshots.get("avg_size", 0)
6702 output = ["\n📸 Context Snapshots:"]
6703 output.append(f" • Total snapshots: {total_snapshots}")
6705 if total_size:
6706 size_mb = total_size / (1024 * 1024)
6707 output.append(f" • Total storage: {size_mb:.2f} MB")
6709 if avg_size:
6710 avg_kb = avg_size / 1024
6711 output.append(f" • Average size: {avg_kb:.1f} KB")
6713 return output
6716def _calculate_efficiency_rates(
6717 sessions: dict,
6718 interruptions: dict,
6719 by_type: list,
6720) -> tuple[float, float]:
6721 """Calculate preservation and auto-save rates."""
6722 preservation_rate = (
6723 sessions.get("preserved_sessions", 0)
6724 / max(sessions.get("total_sessions", 1), 1)
6725 ) * 100
6727 auto_save_rate = 0
6728 total_interruptions = interruptions.get("total", 0)
6729 if by_type and total_interruptions > 0:
6730 total_auto_saved = sum(t.get("auto_saved_interruptions", 0) for t in by_type)
6731 auto_save_rate = (total_auto_saved / max(total_interruptions, 1)) * 100
6733 return preservation_rate, auto_save_rate
6736def _format_efficiency_metrics(
6737 sessions: dict,
6738 interruptions: dict,
6739 by_type: list,
6740) -> list[str]:
6741 """Format efficiency metrics section."""
6742 if not (sessions and interruptions):
6743 return []
6745 preservation_rate, auto_save_rate = _calculate_efficiency_rates(
6746 sessions,
6747 interruptions,
6748 by_type,
6749 )
6751 return [
6752 "\n📈 Efficiency Metrics:",
6753 f" • Context preservation rate: {preservation_rate:.1f}%",
6754 f" • Auto-save success rate: {auto_save_rate:.1f}%",
6755 ]
6758def _has_statistics_data(sessions: dict, interruptions: dict, snapshots: dict) -> bool:
6759 """Check if there's any statistics data available."""
6760 return any(
6761 [
6762 sessions,
6763 interruptions.get("total", 0),
6764 snapshots.get("total_snapshots", 0),
6765 ],
6766 )
6769def _format_no_data_message(user_id: str) -> list[str]:
6770 """Format message when no statistics data is found."""
6771 return [
6772 "📊 No interruption management data found",
6773 f"👤 User: {user_id}",
6774 "💡 Start using interruption monitoring to see statistics",
6775 ]
6778async def get_interruption_statistics(user_id: str) -> str:
6779 """Get comprehensive interruption and context preservation statistics."""
6780 try:
6781 from .interruption_manager import (
6782 get_interruption_statistics as _get_interruption_statistics,
6783 )
6785 stats = await _get_interruption_statistics(user_id)
6786 output = _format_statistics_header(user_id)
6788 # Get statistics sections
6789 sessions = stats.get("sessions", {})
6790 interruptions = stats.get("interruptions", {})
6791 snapshots = stats.get("snapshots", {})
6792 by_type = interruptions.get("by_type", [])
6794 # Format all sections
6795 output.extend(_format_session_statistics(sessions))
6796 output.extend(_format_interruption_statistics(interruptions))
6797 output.extend(_format_snapshot_statistics(snapshots))
6798 output.extend(_format_efficiency_metrics(sessions, interruptions, by_type))
6800 # Check if we have any data
6801 if not _has_statistics_data(sessions, interruptions, snapshots):
6802 output = _format_no_data_message(user_id)
6804 return "\n".join(output)
6806 except ImportError:
6807 return "❌ Interruption management tools not available"
6808 except Exception as e:
6809 return f"❌ Error getting statistics: {e}"
6812# =====================================
6813# Crackerjack Integration MCP Tools
6814# =====================================
6817@mcp.tool()
6818async def execute_crackerjack_command(
6819 command: str,
6820 args: str = "",
6821 working_directory: str = ".",
6822 timeout: int = 300,
6823 ai_agent_mode: bool = False,
6824) -> str:
6825 """Execute a Crackerjack command with enhanced AI integration.
6827 Args:
6828 command: Crackerjack command to execute (analyze, check, test, lint, etc.)
6829 args: Additional command line arguments
6830 working_directory: Directory to run command in
6831 timeout: Command timeout in seconds
6832 ai_agent_mode: Enable AI agent autonomous fixing mode
6834 """
6835 if not CRACKERJACK_INTEGRATION_AVAILABLE:
6836 return "❌ Crackerjack integration not available"
6838 try:
6839 integration = CrackerjackIntegration()
6840 args_list = args.split() if args else []
6841 result = await integration.execute_crackerjack_command(
6842 command,
6843 args_list,
6844 working_directory,
6845 timeout,
6846 ai_agent_mode,
6847 )
6849 output = [f"🔧 Crackerjack {command} execution complete"]
6850 if ai_agent_mode:
6851 output.append("🤖 AI Agent mode enabled")
6852 output.append(f"📁 Working directory: {working_directory}")
6853 output.append(f"⏱️ Duration: {result.execution_time:.2f}s")
6854 output.append(f"🎯 Exit code: {result.exit_code}")
6856 if result.parsed_data:
6857 output.append("\n📊 Parsed Results:")
6858 for key, value in result.parsed_data.items():
6859 if isinstance(value, dict) and value:
6860 output.append(f" • {key}: {len(value)} items")
6861 elif isinstance(value, int | float) or value:
6862 output.append(f" • {key}: {value}")
6864 if result.memory_insights:
6865 output.append("\n💡 Memory Insights:")
6866 for insight in result.memory_insights[:3]: # Show top 3
6867 output.append(f" • {insight}")
6869 if result.exit_code != 0 and result.stderr:
6870 output.append("\n❌ Error output:")
6871 output.append(f" {result.stderr[:200]}...")
6873 return "\n".join(output)
6875 except Exception as e:
6876 return f"❌ Error executing Crackerjack command: {e}"
6879# Clean Command Aliases
6880@mcp.tool()
6881async def crackerjack_run(
6882 command: str,
6883 args: str = "",
6884 working_directory: str = ".",
6885 timeout: int = 300,
6886 ai_agent_mode: bool = False,
6887) -> str:
6888 """Run crackerjack with enhanced analytics (replaces /crackerjack:run).
6890 Provides memory integration, intelligent insights, and quality tracking.
6891 Use this instead of basic /crackerjack:run for development work.
6893 Args:
6894 command: Crackerjack command to execute
6895 args: Additional command arguments
6896 working_directory: Directory to run in
6897 timeout: Command timeout in seconds
6898 ai_agent_mode: Enable autonomous AI fixing
6900 """
6901 return await execute_crackerjack_command(
6902 command, args, working_directory, timeout, ai_agent_mode
6903 )
6906@mcp.tool()
6907async def crackerjack_history(
6908 working_directory: str = ".",
6909 command_filter: str = "",
6910 days: int = 7,
6911) -> str:
6912 """View crackerjack execution history with trends and patterns."""
6913 return await get_crackerjack_results_history(
6914 working_directory,
6915 command_filter,
6916 days,
6917 )
6920@mcp.tool()
6921async def crackerjack_metrics(working_directory: str = ".", days: int = 30) -> str:
6922 """Get quality metrics trends from crackerjack execution history."""
6923 return await get_crackerjack_quality_metrics(days, working_directory)
6926@mcp.tool()
6927async def crackerjack_patterns(days: int = 7, working_directory: str = ".") -> str:
6928 """Analyze test failure patterns and trends."""
6929 return await analyze_crackerjack_test_patterns(days, working_directory)
6932@mcp.tool()
6933async def crackerjack_help() -> str:
6934 """Get comprehensive help for choosing the right crackerjack commands."""
6935 output = ["🔧 CRACKERJACK COMMAND GUIDE", "=" * 50]
6937 # Check what's available
6938 detected_servers = _detect_other_mcp_servers()
6939 has_basic_crackerjack = detected_servers.get("crackerjack", False)
6940 has_enhanced = CRACKERJACK_INTEGRATION_AVAILABLE
6942 if has_enhanced:
6943 output.extend(
6944 [
6945 "\n🎯 RECOMMENDED (Enhanced with AI Intelligence):",
6946 "• /session-mgmt:crackerjack-run check - Full analysis with memory integration",
6947 "• /session-mgmt:crackerjack-run test - Test execution with pattern tracking",
6948 "• /session-mgmt:crackerjack-run lint - Linting with trend analysis",
6949 "• /session-mgmt:crackerjack-history - View execution history and trends",
6950 "• /session-mgmt:crackerjack-metrics - Quality metrics over time",
6951 "• /session-mgmt:crackerjack-patterns - Analyze test failure patterns",
6952 "",
6953 "🧠 Enhanced Features:",
6954 " ✅ Conversation memory - remembers all results",
6955 " ✅ Intelligent insights - automated analysis",
6956 " ✅ Quality tracking - metrics over time",
6957 " ✅ Pattern detection - identifies recurring issues",
6958 " ✅ Trend analysis - shows improvement/degradation",
6959 " ✅ Cross-session learning - builds knowledge over time",
6960 ],
6961 )
6963 if has_basic_crackerjack:
6964 output.extend(
6965 [
6966 "\n📋 BASIC (Simple Execution, No Memory):",
6967 "• /crackerjack:run check - Raw output only",
6968 "• /crackerjack:run test - Basic test execution",
6969 "• /crackerjack:run lint - Simple linting",
6970 "",
6971 "⚠️ Basic Features:",
6972 " ❌ No conversation memory",
6973 " ❌ No intelligent analysis",
6974 " ❌ No quality tracking",
6975 " ❌ No pattern detection",
6976 ],
6977 )
6979 # Usage recommendations
6980 output.extend(
6981 [
6982 "\n💡 USAGE RECOMMENDATIONS:",
6983 "",
6984 "🚀 For Development Work:",
6985 " → Use enhanced commands (/session-mgmt:crackerjack-*)",
6986 " → Get AI insights and memory integration",
6987 " → Build long-term project intelligence",
6988 "",
6989 "⚙️ For CI/CD Pipelines:",
6990 " → Use basic commands (/crackerjack:*) if needed",
6991 " → Lighter weight, no persistence requirements",
6992 "",
6993 "🎯 Quick Start:",
6994 " 1. /session-mgmt:crackerjack-run check",
6995 " 2. /session-mgmt:crackerjack-history",
6996 " 3. /session-mgmt:crackerjack-metrics",
6997 ],
6998 )
7000 if not has_enhanced and not has_basic_crackerjack:
7001 output.extend(
7002 [
7003 "\n❌ SETUP REQUIRED:",
7004 "Neither enhanced nor basic crackerjack is available.",
7005 "Install crackerjack to get started with code quality tools.",
7006 ],
7007 )
7009 return "\n".join(output)
7012@mcp.tool()
7013async def get_crackerjack_results_history(
7014 working_directory: str = ".",
7015 command_filter: str = "",
7016 days: int = 7,
7017) -> str:
7018 """Get recent Crackerjack command execution history."""
7019 if not CRACKERJACK_INTEGRATION_AVAILABLE:
7020 return "❌ Crackerjack integration not available"
7022 try:
7023 integration = CrackerjackIntegration()
7024 results = await integration.get_recent_crackerjack_results(
7025 working_directory,
7026 command_filter,
7027 days,
7028 )
7030 if not results:
7031 return f"📋 No Crackerjack results found in the last {days} days"
7033 output = [f"📋 Crackerjack Results History (last {days} days)"]
7034 output.append(f"📁 Directory: {working_directory}")
7035 if command_filter:
7036 output.append(f"🔍 Filter: {command_filter}")
7038 for result in results[:10]: # Show last 10 results
7039 status = "✅" if result.get("exit_code", 1) == 0 else "❌"
7040 command = result.get("command", "unknown")
7041 timestamp = result.get("timestamp", "")
7042 duration = result.get("duration", 0)
7044 output.append(f"\n{status} {command} ({timestamp})")
7045 output.append(f" ⏱️ {duration:.2f}s")
7047 # Show key metrics if available
7048 parsed = result.get("parsed_results", {})
7049 if parsed:
7050 if "test_summary" in parsed:
7051 summary = parsed["test_summary"]
7052 passed = summary.get("passed", 0)
7053 failed = summary.get("failed", 0)
7054 output.append(f" 🧪 Tests: {passed} passed, {failed} failed")
7056 if "lint_issues" in parsed:
7057 issues = len(parsed["lint_issues"])
7058 if issues > 0:
7059 output.append(f" 🔍 Lint: {issues} issues")
7061 if "coverage_percentage" in parsed:
7062 coverage = parsed["coverage_percentage"]
7063 output.append(f" 📊 Coverage: {coverage}%")
7065 return "\n".join(output)
7067 except Exception as e:
7068 return f"❌ Error getting Crackerjack history: {e}"
7071@mcp.tool()
7072async def get_crackerjack_quality_metrics(
7073 working_directory: str = ".",
7074 days: int = 30,
7075) -> str:
7076 """Get quality metrics trends from Crackerjack execution history."""
7077 if not CRACKERJACK_INTEGRATION_AVAILABLE:
7078 return "❌ Crackerjack integration not available"
7080 try:
7081 integration = CrackerjackIntegration()
7082 metrics = await integration.get_quality_metrics_history(working_directory, days)
7084 if not metrics:
7085 return f"📊 No quality metrics found in the last {days} days"
7087 output = [f"📊 Quality Metrics Trends (last {days} days)"]
7088 output.append(f"📁 Directory: {working_directory}")
7090 # Test metrics
7091 test_data = metrics.get("test_metrics", {})
7092 if test_data:
7093 latest_tests = test_data.get("latest", {})
7094 trends = test_data.get("trends", {})
7096 output.append("\n🧪 Test Metrics:")
7097 if latest_tests:
7098 passed = latest_tests.get("passed", 0)
7099 failed = latest_tests.get("failed", 0)
7100 total = passed + failed
7101 pass_rate = (passed / max(total, 1)) * 100
7102 output.append(f" • Latest: {passed}/{total} passed ({pass_rate:.1f}%)")
7104 if trends.get("pass_rate_trend"):
7105 trend = trends["pass_rate_trend"]
7106 direction = "📈" if trend > 0 else "📉" if trend < 0 else "➡️"
7107 output.append(f" • Pass rate trend: {direction} {trend:+.1f}%")
7109 # Coverage metrics
7110 coverage_data = metrics.get("coverage_metrics", {})
7111 if coverage_data:
7112 latest = coverage_data.get("latest", 0)
7113 trend = coverage_data.get("trend", 0)
7114 direction = "📈" if trend > 0 else "📉" if trend < 0 else "➡️"
7116 output.append("\n📊 Coverage Metrics:")
7117 output.append(f" • Latest coverage: {latest}%")
7118 output.append(f" • Coverage trend: {direction} {trend:+.1f}%")
7120 # Lint metrics
7121 lint_data = metrics.get("lint_metrics", {})
7122 if lint_data:
7123 latest = lint_data.get("latest_issues", 0)
7124 trend = lint_data.get("issues_trend", 0)
7125 direction = "📈" if trend > 0 else "📉" if trend < 0 else "➡️"
7127 output.append("\n🔍 Code Quality:")
7128 output.append(f" • Latest lint issues: {latest}")
7129 output.append(f" • Issues trend: {direction} {trend:+.0f}")
7131 # Security metrics
7132 security_data = metrics.get("security_metrics", {})
7133 if security_data:
7134 latest = security_data.get("latest_issues", 0)
7135 severity = security_data.get("highest_severity", "none")
7137 output.append("\n🔒 Security Metrics:")
7138 output.append(f" • Latest security issues: {latest}")
7139 if severity != "none":
7140 output.append(f" • Highest severity: {severity}")
7142 if not any([test_data, coverage_data, lint_data, security_data]):
7143 output = [
7144 f"📊 No quality metrics available for {working_directory}",
7145 "💡 Run some Crackerjack commands to start tracking metrics",
7146 ]
7148 return "\n".join(output)
7150 except Exception as e:
7151 return f"❌ Error getting quality metrics: {e}"
7154def _add_frequent_failures_info(output: list[str], frequent_failures: list) -> None:
7155 """Add frequent failures information to output."""
7156 if frequent_failures:
7157 output.append("\n🔥 Most Frequent Failures:")
7158 for failure in frequent_failures[:5]:
7159 test_name = failure.get("test_name", "unknown")
7160 count = failure.get("failure_count", 0)
7161 last_error = failure.get("latest_error_type", "unknown")
7162 output.append(f" • {test_name} ({count}x) - {last_error}")
7165def _add_error_types_info(output: list[str], error_types: dict) -> None:
7166 """Add error type distribution information to output."""
7167 if error_types:
7168 output.append("\n📊 Error Type Distribution:")
7169 for error_type, count in sorted(
7170 error_types.items(),
7171 key=lambda x: x[1],
7172 reverse=True,
7173 )[:5]:
7174 output.append(f" • {error_type}: {count} occurrences")
7177def _add_trends_info(output: list[str], trends: dict) -> None:
7178 """Add trends information to output."""
7179 if not trends:
7180 return
7182 output.append("\n📈 Trends:")
7184 stability = trends.get("stability_score", 0)
7185 stability_emoji = "🟢" if stability > 80 else "🟡" if stability > 60 else "🔴"
7186 output.append(f" • Test stability: {stability_emoji} {stability:.1f}%")
7188 avg_duration = trends.get("average_test_duration", 0)
7189 if avg_duration > 0:
7190 output.append(f" • Average test duration: {avg_duration:.1f}s")
7192 if trends.get("most_problematic_files"):
7193 files = trends["most_problematic_files"][:3]
7194 output.append(" • Most problematic files:")
7195 for file_path in files:
7196 output.append(f" - {file_path}")
7199def _generate_test_recommendations(
7200 frequent_failures: list,
7201 error_types: dict,
7202 trends: dict,
7203) -> list[str]:
7204 """Generate recommendations based on test patterns."""
7205 recommendations = []
7207 if frequent_failures and len(frequent_failures) > 3:
7208 recommendations.append("Consider investigating the most frequent test failures")
7210 if error_types.get("AssertionError", 0) > error_types.get("ImportError", 0) * 2:
7211 recommendations.append(
7212 "High assertion failures suggest logic issues in tests or code",
7213 )
7215 if trends.get("stability_score", 100) < 70:
7216 recommendations.append(
7217 "Test stability is below 70% - consider test environment review",
7218 )
7220 return recommendations
7223@mcp.tool()
7224async def analyze_crackerjack_test_patterns(
7225 working_directory: str = ".",
7226 days: int = 7,
7227) -> str:
7228 """Analyze test failure patterns and trends."""
7229 if not CRACKERJACK_INTEGRATION_AVAILABLE:
7230 return "❌ Crackerjack integration not available"
7232 try:
7233 integration = CrackerjackIntegration()
7234 patterns = await integration.get_test_failure_patterns(days)
7236 if not patterns:
7237 return f"🧪 No test failure patterns found in the last {days} days"
7239 output = [f"🧪 Test Failure Pattern Analysis (last {days} days)"]
7240 output.append(f"📁 Directory: {working_directory}")
7242 frequent_failures = patterns.get("frequent_failures", [])
7243 error_types = patterns.get("error_type_distribution", {})
7244 trends = patterns.get("trends", {})
7246 _add_frequent_failures_info(output, frequent_failures)
7247 _add_error_types_info(output, error_types)
7248 _add_trends_info(output, trends)
7250 recommendations = _generate_test_recommendations(
7251 frequent_failures,
7252 error_types,
7253 trends,
7254 )
7255 if recommendations:
7256 output.append("\n💡 Recommendations:")
7257 for rec in recommendations:
7258 output.append(f" • {rec}")
7260 if not any([frequent_failures, error_types, trends]):
7261 output = [
7262 "🧪 No test failure patterns to analyze",
7263 f"📁 Directory: {working_directory}",
7264 "💡 Run tests with failures to enable pattern analysis",
7265 ]
7267 return "\n".join(output)
7269 except Exception as e:
7270 return f"❌ Error analyzing test patterns: {e}"
7273@mcp.tool()
7274async def crackerjack_quality_trends(
7275 working_directory: str = ".",
7276 days: int = 30,
7277) -> str:
7278 """Analyze quality trends over time with actionable insights."""
7279 if not CRACKERJACK_INTEGRATION_AVAILABLE:
7280 return "❌ Crackerjack integration not available"
7282 try:
7283 integration = CrackerjackIntegration()
7284 trends = await integration.get_quality_trends(working_directory, days)
7286 output = [f"📊 Quality Trends Analysis (last {days} days)"]
7287 output.append(f"📁 Directory: {working_directory}")
7289 overall = trends.get("overall", {})
7290 output.append(
7291 f"\n🎯 Overall Direction: {overall.get('overall_direction', 'unknown').upper()}"
7292 )
7294 improving = overall.get("improving_count", 0)
7295 declining = overall.get("declining_count", 0)
7296 stable = overall.get("stable_count", 0)
7298 output.append(f" 📈 Improving: {improving} metrics")
7299 output.append(f" 📉 Declining: {declining} metrics")
7300 output.append(f" ➡️ Stable: {stable} metrics")
7302 # Individual metric trends
7303 metric_trends = trends.get("trends", {})
7304 if metric_trends:
7305 output.append("\n📋 Metric Details:")
7306 for metric_type, trend_data in metric_trends.items():
7307 direction = trend_data.get("direction", "unknown")
7308 recent_avg = trend_data.get("recent_average", 0)
7309 change = trend_data.get("change", 0)
7310 strength = trend_data.get("trend_strength", "unknown")
7312 icon = (
7313 "📈"
7314 if direction == "improving"
7315 else "📉"
7316 if direction == "declining"
7317 else "➡️"
7318 )
7319 metric_name = metric_type.replace("_", " ").title()
7321 output.append(
7322 f" {icon} {metric_name}: {recent_avg:.1f}% (±{change:.1f}%, {strength})"
7323 )
7325 # Recommendations
7326 recommendations = trends.get("recommendations", [])
7327 if recommendations:
7328 output.append("\n💡 Recommendations:")
7329 for rec in recommendations[:5]: # Show top 5
7330 output.append(f" • {rec}")
7332 return "\n".join(output)
7334 except Exception as e:
7335 return f"❌ Error analyzing quality trends: {e}"
7338@mcp.tool()
7339async def crackerjack_health_check() -> str:
7340 """Check Crackerjack integration health and provide diagnostics."""
7341 if not CRACKERJACK_INTEGRATION_AVAILABLE:
7342 return "❌ Crackerjack integration not available - check installation"
7344 try:
7345 integration = CrackerjackIntegration()
7346 health = await integration.health_check()
7348 output = ["🏥 Crackerjack Integration Health Check"]
7350 status = health.get("status", "unknown")
7351 if status == "healthy":
7352 output.append("✅ Status: HEALTHY")
7353 elif status == "partial":
7354 output.append("⚠️ Status: PARTIAL (some features may be limited)")
7355 else:
7356 output.append("❌ Status: UNHEALTHY")
7358 # Detailed checks
7359 output.append("\n🔍 Component Status:")
7361 if health.get("crackerjack_available"):
7362 output.append(" ✅ Crackerjack CLI: Available")
7363 else:
7364 output.append(" ❌ Crackerjack CLI: Not available")
7366 if health.get("database_accessible"):
7367 output.append(" ✅ Integration Database: Accessible")
7368 else:
7369 output.append(" ❌ Integration Database: Not accessible")
7371 # Recommendations
7372 recommendations = health.get("recommendations", [])
7373 if recommendations:
7374 output.append("\n💡 Diagnostics:")
7375 for rec in recommendations:
7376 output.append(f" • {rec}")
7378 # Quick fix suggestions
7379 if status != "healthy":
7380 output.append("\n🔧 Quick Fixes:")
7381 if not health.get("crackerjack_available"):
7382 output.append(" → Install crackerjack: uv add crackerjack")
7383 if not health.get("database_accessible"):
7384 output.append(" → Check ~/.claude/data/ directory permissions")
7385 output.append(" → Try running a simple crackerjack command first")
7387 return "\n".join(output)
7389 except Exception as e:
7390 return f"❌ Health check failed: {e}"
7393def _format_monitoring_status(quality_data: dict) -> list[str]:
7394 """Format the monitoring status section."""
7395 output = []
7396 if quality_data["monitoring_active"]:
7397 output.append("✅ Quality monitoring: ACTIVE")
7398 else:
7399 output.append("⚠️ Quality monitoring: LIMITED")
7400 return output
7403def _format_quality_trend(quality_data: dict) -> list[str]:
7404 """Format the quality trend analysis section."""
7405 output = []
7406 trend = quality_data["quality_trend"]
7407 if trend == "improving":
7408 output.append(f"📈 Quality trend: {trend.upper()} ✅")
7409 elif trend == "declining":
7410 output.append(f"📉 Quality trend: {trend.upper()} ⚠️")
7411 else:
7412 output.append(f"📊 Quality trend: {trend.upper()}")
7413 return output
7416def _format_quality_alerts(quality_data: dict) -> list[str]:
7417 """Format the quality alerts section."""
7418 output = []
7419 alerts = quality_data.get("alerts", [])
7420 if alerts:
7421 output.append(f"\n⚠️ Active Alerts ({len(alerts)}):")
7422 for i, alert in enumerate(alerts, 1):
7423 if "URGENT" in alert:
7424 output.append(f" {i}. 🚨 {alert}")
7425 elif "WARNING" in alert:
7426 output.append(f" {i}. ⚠️ {alert}")
7427 else:
7428 output.append(f" {i}. 💡 {alert}")
7429 else:
7430 output.append("\n✅ No quality alerts - system healthy")
7431 return output
7434def _format_proactive_recommendations(quality_data: dict) -> list[str]:
7435 """Format the proactive recommendations section."""
7436 output = []
7437 if quality_data.get("recommend_checkpoint"):
7438 output.append("\n🔄 IMMEDIATE ACTION RECOMMENDED:")
7439 output.append(" • Run checkpoint to address quality issues")
7440 output.append(" • Review recent workflow changes")
7441 output.append(" • Consider conversation cleanup if needed")
7442 return output
7445async def _format_conversation_summary() -> list[str]:
7446 """Format the conversation summary section."""
7447 output = []
7448 try:
7449 conversation_summary = await summarize_current_conversation()
7450 if conversation_summary["key_topics"]:
7451 output.append("\n💬 Current Session Focus:")
7452 for topic in conversation_summary["key_topics"][:3]:
7453 output.append(f" • {topic}")
7455 if conversation_summary["decisions_made"]:
7456 output.append("\n✅ Key Decisions:")
7457 for decision in conversation_summary["decisions_made"][:2]:
7458 output.append(f" • {decision}")
7459 except Exception:
7460 pass
7461 return output
7464def _format_monitor_usage_guidance() -> list[str]:
7465 """Format the monitor usage guidance section."""
7466 return [
7467 "\n💡 Monitor Usage:",
7468 " • Run quality_monitor between checkpoints",
7469 " • Watch for declining trends",
7470 " • Act on urgent/warning alerts immediately",
7471 ]
7474@mcp.tool()
7475async def quality_monitor() -> str:
7476 """Phase 3: Proactive quality monitoring with early warning system."""
7477 output = []
7478 output.append("📊 Proactive Quality Monitor")
7479 output.append("=" * 50)
7481 try:
7482 quality_data = await monitor_proactive_quality()
7484 # Build output sections using helper functions
7485 output.extend(_format_monitoring_status(quality_data))
7486 output.extend(_format_quality_trend(quality_data))
7487 output.extend(_format_quality_alerts(quality_data))
7488 output.extend(_format_proactive_recommendations(quality_data))
7489 output.extend(await _format_conversation_summary())
7490 output.extend(_format_monitor_usage_guidance())
7492 return "\n".join(output)
7494 except Exception as e:
7495 output.append(f"❌ Quality monitoring failed: {e}")
7496 output.append("💡 Try running a regular checkpoint instead")
7497 return "\n".join(output)
7500@mcp.prompt("quality-monitor")
7501async def quality_monitor_prompt() -> str:
7502 """Proactive session quality monitoring with trend analysis and early warnings."""
7503 return await quality_monitor()
7506@mcp.tool()
7507async def auto_compact() -> str:
7508 """Automatically trigger conversation compaction with intelligent summary."""
7509 output = []
7510 output.append("🔄 Auto-Compaction Tool")
7511 output.append("=" * 50)
7513 try:
7514 # Generate intelligent conversation summary for compaction
7515 conversation_summary = await summarize_current_conversation()
7517 output.append("📝 Preparing conversation compaction...")
7518 output.append("🎯 Key conversation elements to preserve:")
7520 if conversation_summary["key_topics"]:
7521 output.append(
7522 f"• Topics: {', '.join(conversation_summary['key_topics'][:3])}",
7523 )
7525 if conversation_summary["decisions_made"]:
7526 output.append(f"• Decisions: {conversation_summary['decisions_made'][0]}")
7528 if conversation_summary["next_steps"]:
7529 output.append(f"• Next steps: {conversation_summary['next_steps'][0]}")
7531 # Store the summary in the database
7532 try:
7533 db = await get_reflection_database()
7534 pre_compact_summary = f"Pre-compaction session state: {', '.join(conversation_summary['key_topics'])}. "
7535 pre_compact_summary += (
7536 f"Key decisions: {', '.join(conversation_summary['decisions_made'])}. "
7537 )
7538 pre_compact_summary += (
7539 f"Next steps: {', '.join(conversation_summary['next_steps'])}"
7540 )
7542 await db.store_reflection(
7543 pre_compact_summary,
7544 [
7545 "auto-compact",
7546 "pre-compaction",
7547 "context-preservation",
7548 current_project or "unknown-project",
7549 ],
7550 )
7552 output.append("💾 Context preserved in reflection database")
7554 except Exception as e:
7555 output.append(f"⚠️ Context preservation failed: {str(e)[:50]}")
7557 # The key insight: Return a message that will trigger Claude to run /compact
7558 output.append("\n" + "=" * 50)
7559 output.append("🚨 CONTEXT COMPACTION REQUIRED")
7560 output.append("=" * 50)
7561 output.append("📋 Session summary prepared and stored safely")
7562 output.append("🔄 Claude will now automatically compact the conversation")
7563 output.append("✅ All important context has been preserved")
7565 # This is the magic: Return a response that triggers automatic compaction
7566 output.append("\n/compact")
7568 return "\n".join(output)
7570 except Exception as e:
7571 output.append(f"❌ Auto-compaction failed: {e}")
7572 output.append("💡 Manual /compact recommended")
7573 return "\n".join(output)
7576@mcp.prompt("auto-compact")
7577async def auto_compact_prompt() -> str:
7578 """Automatically trigger conversation compaction with context preservation."""
7579 return await auto_compact()
7582# Multi-Project Coordination Tools
7585@mcp.tool()
7586async def create_project_group(
7587 name: str,
7588 projects: list[str],
7589 description: str = "",
7590) -> str:
7591 """Create a new project group for multi-project coordination."""
7592 if not multi_project_coordinator:
7593 await initialize_new_features()
7594 if not multi_project_coordinator:
7595 return "❌ Multi-project coordination not available"
7597 try:
7598 group = await multi_project_coordinator.create_project_group(
7599 name=name,
7600 projects=projects,
7601 description=description,
7602 )
7604 return f"""✅ **Project Group Created**
7606**Group:** {group.name}
7607**Projects:** {", ".join(group.projects)}
7608**Description:** {group.description or "None"}
7609**ID:** {group.id}
7611The project group is now available for cross-project coordination and knowledge sharing."""
7613 except Exception as e:
7614 return f"❌ Failed to create project group: {e}"
7617@mcp.tool()
7618async def add_project_dependency(
7619 source_project: str,
7620 target_project: str,
7621 dependency_type: str,
7622 description: str = "",
7623) -> str:
7624 """Add a dependency relationship between projects."""
7625 if not multi_project_coordinator:
7626 await initialize_new_features()
7627 if not multi_project_coordinator:
7628 return "❌ Multi-project coordination not available"
7630 try:
7631 dependency = await multi_project_coordinator.add_project_dependency(
7632 source_project=source_project,
7633 target_project=target_project,
7634 dependency_type=dependency_type,
7635 description=description,
7636 )
7638 return f"""✅ **Project Dependency Added**
7640**Source:** {dependency.source_project}
7641**Target:** {dependency.target_project}
7642**Type:** {dependency.dependency_type}
7643**Description:** {dependency.description or "None"}
7645This relationship will be used for cross-project search and coordination."""
7647 except Exception as e:
7648 return f"❌ Failed to add project dependency: {e}"
7651@mcp.tool()
7652async def search_across_projects(
7653 query: str,
7654 current_project: str,
7655 limit: int = 10,
7656) -> str:
7657 """Search conversations across related projects."""
7658 if not multi_project_coordinator:
7659 await initialize_new_features()
7660 if not multi_project_coordinator:
7661 return "❌ Multi-project coordination not available"
7663 try:
7664 results = await multi_project_coordinator.find_related_conversations(
7665 current_project=current_project,
7666 query=query,
7667 limit=limit,
7668 )
7670 if not results:
7671 return f"🔍 No results found for '{query}' across related projects"
7673 output = [f"🔍 **Cross-Project Search Results** ({len(results)} found)\n"]
7675 for i, result in enumerate(results, 1):
7676 project_indicator = (
7677 "📍 Current"
7678 if result["is_current_project"]
7679 else f"🔗 {result['source_project']}"
7680 )
7682 output.append(f"""**{i}.** {project_indicator}
7683**Score:** {result["score"]:.3f}
7684**Content:** {result["content"][:200]}{"..." if len(result["content"]) > 200 else ""}
7685**Timestamp:** {result.get("timestamp", "Unknown")}
7686---""")
7688 return "\n".join(output)
7690 except Exception as e:
7691 return f"❌ Search failed: {e}"
7694@mcp.tool()
7695async def get_project_insights(projects: list[str], time_range_days: int = 30) -> str:
7696 """Get cross-project insights and collaboration opportunities."""
7697 if not multi_project_coordinator:
7698 await initialize_new_features()
7699 if not multi_project_coordinator:
7700 return "❌ Multi-project coordination not available"
7702 try:
7703 insights = await multi_project_coordinator.get_cross_project_insights(
7704 projects=projects,
7705 time_range_days=time_range_days,
7706 )
7708 output = [f"📊 **Cross-Project Insights** (Last {time_range_days} days)\n"]
7710 # Project activity
7711 if insights["project_activity"]:
7712 output.append("**📈 Project Activity:**")
7713 for project, stats in insights["project_activity"].items():
7714 output.append(
7715 f"• **{project}:** {stats['conversation_count']} conversations, last active: {stats.get('last_activity', 'Unknown')}",
7716 )
7717 output.append("")
7719 # Common patterns
7720 if insights["common_patterns"]:
7721 output.append("**🔍 Common Patterns:**")
7722 for pattern in insights["common_patterns"][:5]: # Top 5
7723 projects_str = ", ".join(pattern["projects"])
7724 output.append(
7725 f"• **{pattern['pattern']}** across {projects_str} (frequency: {pattern['frequency']})",
7726 )
7727 output.append("")
7729 if not insights["project_activity"] and not insights["common_patterns"]:
7730 output.append("No insights available for the specified time range.")
7732 return "\n".join(output)
7734 except Exception as e:
7735 return f"❌ Failed to get insights: {e}"
7738# Advanced Search Tools
7741@mcp.tool()
7742async def advanced_search(
7743 query: str,
7744 content_type: str | None = None,
7745 project: str | None = None,
7746 timeframe: str | None = None,
7747 sort_by: str = "relevance",
7748 limit: int = 10,
7749) -> str:
7750 """Perform advanced search with faceted filtering."""
7751 if not advanced_search_engine:
7752 await initialize_new_features()
7753 if not advanced_search_engine:
7754 return "❌ Advanced search not available"
7756 try:
7757 filters = []
7759 # Add content type filter
7760 if content_type:
7761 from session_mgmt_mcp.advanced_search import SearchFilter
7763 filters.append(
7764 SearchFilter(field="content_type", operator="eq", value=content_type),
7765 )
7767 # Add project filter
7768 if project:
7769 filters.append(SearchFilter(field="project", operator="eq", value=project))
7771 # Add timeframe filter
7772 if timeframe:
7773 start_time, end_time = advanced_search_engine._parse_timeframe(timeframe)
7774 filters.append(
7775 SearchFilter(
7776 field="timestamp",
7777 operator="range",
7778 value=(start_time, end_time),
7779 ),
7780 )
7782 # Perform search
7783 search_results = await advanced_search_engine.search(
7784 query=query,
7785 filters=filters,
7786 sort_by=sort_by,
7787 limit=limit,
7788 include_highlights=True,
7789 )
7791 results = search_results["results"]
7792 if not results:
7793 return f"🔍 No results found for '{query}'"
7795 output = [f"🔍 **Advanced Search Results** ({len(results)} found)\n"]
7797 for i, result in enumerate(results, 1):
7798 output.append(f"""**{i}.** {result.title}
7799**Score:** {result.score:.3f} | **Project:** {result.project or "Unknown"}
7800**Content:** {result.content}
7801**Timestamp:** {result.timestamp}""")
7803 if result.highlights:
7804 output.append(f"**Highlights:** {'; '.join(result.highlights)}")
7806 output.append("---")
7808 return "\n".join(output)
7810 except Exception as e:
7811 return f"❌ Advanced search failed: {e}"
7814@mcp.tool()
7815async def search_suggestions(query: str, field: str = "content", limit: int = 5) -> str:
7816 """Get search completion suggestions."""
7817 if not advanced_search_engine:
7818 await initialize_new_features()
7819 if not advanced_search_engine:
7820 return "❌ Advanced search not available"
7822 try:
7823 suggestions = await advanced_search_engine.suggest_completions(
7824 query=query,
7825 field=field,
7826 limit=limit,
7827 )
7829 if not suggestions:
7830 return f"💡 No suggestions found for '{query}'"
7832 output = [f"💡 **Search Suggestions** for '{query}':\n"]
7834 for i, suggestion in enumerate(suggestions, 1):
7835 output.append(
7836 f"{i}. {suggestion['text']} (frequency: {suggestion['frequency']})",
7837 )
7839 return "\n".join(output)
7841 except Exception as e:
7842 return f"❌ Failed to get suggestions: {e}"
7845@mcp.tool()
7846async def get_search_metrics(metric_type: str, timeframe: str = "30d") -> str:
7847 """Get search and activity metrics."""
7848 if not advanced_search_engine:
7849 await initialize_new_features()
7850 if not advanced_search_engine:
7851 return "❌ Advanced search not available"
7853 try:
7854 metrics = await advanced_search_engine.aggregate_metrics(
7855 metric_type=metric_type,
7856 timeframe=timeframe,
7857 )
7859 if "error" in metrics:
7860 return f"❌ {metrics['error']}"
7862 output = [f"📊 **{metric_type.title()} Metrics** ({timeframe})\n"]
7864 for item in metrics["data"][:10]: # Top 10
7865 output.append(f"• **{item['key']}:** {item['value']}")
7867 if not metrics["data"]:
7868 output.append("No data available for the specified timeframe.")
7870 return "\n".join(output)
7872 except Exception as e:
7873 return f"❌ Failed to get metrics: {e}"
7876# Git Worktree Management Tools
7879@mcp.tool()
7880async def git_worktree_list(working_directory: str | None = None) -> str:
7881 """List all git worktrees for the current repository."""
7882 from .worktree_manager import WorktreeManager
7884 working_dir = Path(working_directory or os.getcwd())
7885 manager = WorktreeManager(session_logger=session_logger)
7887 try:
7888 result = await manager.list_worktrees(working_dir)
7890 if not result["success"]:
7891 return f"❌ {result['error']}"
7893 worktrees = result["worktrees"]
7894 if not worktrees:
7895 return (
7896 "📝 No worktrees found. This repository only has the main working tree."
7897 )
7899 output = [
7900 f"🌿 **Git Worktrees** ({result['total_count']} total)\n",
7901 f"📂 Repository: {working_dir.name}",
7902 f"🎯 Current: {result.get('current_worktree', 'Unknown')}\n",
7903 ]
7905 for wt in worktrees:
7906 prefix = "🔸" if wt["is_current"] else "◦"
7907 main_indicator = " (main)" if wt["is_main"] else ""
7908 detached_indicator = " (detached)" if wt["is_detached"] else ""
7910 output.append(
7911 f"{prefix} **{wt['branch']}{main_indicator}{detached_indicator}**",
7912 )
7913 output.append(f" 📁 {wt['path']}")
7915 status_items = []
7916 if wt["locked"]:
7917 status_items.append("🔒 locked")
7918 if wt["prunable"]:
7919 status_items.append("🗑️ prunable")
7920 if not wt["exists"]:
7921 status_items.append("❌ missing")
7922 if wt["has_session"]:
7923 status_items.append("🧠 has session")
7925 if status_items:
7926 output.append(f" Status: {', '.join(status_items)}")
7927 output.append("")
7929 return "\n".join(output)
7931 except Exception as e:
7932 session_logger.exception(f"git_worktree_list failed: {e}")
7933 return f"❌ Failed to list worktrees: {e}"
7936@mcp.tool()
7937async def git_worktree_add(
7938 branch: str,
7939 path: str,
7940 working_directory: str | None = None,
7941 create_branch: bool = False,
7942) -> str:
7943 """Create a new git worktree."""
7944 from .worktree_manager import WorktreeManager
7946 working_dir = Path(working_directory or os.getcwd())
7947 new_path = Path(path)
7949 if not new_path.is_absolute():
7950 new_path = working_dir.parent / path
7952 manager = WorktreeManager(session_logger=session_logger)
7954 try:
7955 result = await manager.create_worktree(
7956 repository_path=working_dir,
7957 new_path=new_path,
7958 branch=branch,
7959 create_branch=create_branch,
7960 )
7962 if not result["success"]:
7963 return f"❌ {result['error']}"
7965 output = [
7966 "🎉 **Worktree Created Successfully!**\n",
7967 f"🌿 Branch: {result['branch']}",
7968 f"📁 Path: {result['worktree_path']}",
7969 f"🎯 Created new branch: {'Yes' if create_branch else 'No'}",
7970 ]
7972 if result.get("output"):
7973 output.append(f"\n📝 Git output: {result['output']}")
7975 output.append(f"\n💡 To start working: cd {result['worktree_path']}")
7976 output.append("💡 Use `git_worktree_list` to see all worktrees")
7978 return "\n".join(output)
7980 except Exception as e:
7981 session_logger.exception(f"git_worktree_add failed: {e}")
7982 return f"❌ Failed to create worktree: {e}"
7985@mcp.tool()
7986async def git_worktree_remove(
7987 path: str,
7988 working_directory: str | None = None,
7989 force: bool = False,
7990) -> str:
7991 """Remove an existing git worktree."""
7992 from .worktree_manager import WorktreeManager
7994 working_dir = Path(working_directory or os.getcwd())
7995 remove_path = Path(path)
7997 if not remove_path.is_absolute():
7998 remove_path = working_dir.parent / path
8000 manager = WorktreeManager(session_logger=session_logger)
8002 try:
8003 result = await manager.remove_worktree(
8004 repository_path=working_dir,
8005 worktree_path=remove_path,
8006 force=force,
8007 )
8009 if not result["success"]:
8010 return f"❌ {result['error']}"
8012 output = [
8013 "🗑️ **Worktree Removed Successfully!**\n",
8014 f"📁 Removed path: {result['removed_path']}",
8015 ]
8017 if result.get("output"):
8018 output.append(f"📝 Git output: {result['output']}")
8020 output.append(f"\n💡 Used force removal: {'Yes' if force else 'No'}")
8021 output.append("💡 Use `git_worktree_list` to see remaining worktrees")
8023 return "\n".join(output)
8025 except Exception as e:
8026 session_logger.exception(f"git_worktree_remove failed: {e}")
8027 return f"❌ Failed to remove worktree: {e}"
8030@mcp.tool()
8031async def git_worktree_status(working_directory: str | None = None) -> str:
8032 """Get comprehensive status of current worktree and all related worktrees."""
8033 from .worktree_manager import WorktreeManager
8035 working_dir = Path(working_directory or os.getcwd())
8036 manager = WorktreeManager(session_logger=session_logger)
8038 try:
8039 result = await manager.get_worktree_status(working_dir)
8041 if not result["success"]:
8042 return f"❌ {result['error']}"
8044 current = result["current_worktree"]
8045 all_worktrees = result["all_worktrees"]
8046 session_summary = result["session_summary"]
8048 output = [
8049 "🌿 **Git Worktree Status**\n",
8050 f"📂 Repository: {working_dir.name}",
8051 f"🎯 Current worktree: {current['branch']}"
8052 + (" (main)" if current["is_main"] else " (worktree)"),
8053 f"📁 Path: {current['path']}",
8054 f"🧠 Has session: {'Yes' if current['has_session'] else 'No'}",
8055 f"🔸 Detached HEAD: {'Yes' if current['is_detached'] else 'No'}\n",
8056 ]
8058 # Session summary across all worktrees
8059 output.append("📊 **Multi-Worktree Summary:**")
8060 output.append(f"• Total worktrees: {result['total_worktrees']}")
8061 output.append(f"• Active sessions: {session_summary['active_sessions']}")
8062 output.append(f"• Unique branches: {session_summary['unique_branches']}")
8063 output.append(f"• Branches: {', '.join(session_summary['branches'])}\n")
8065 # List all worktrees with status
8066 output.append("🌳 **All Worktrees:**")
8067 for i, wt in enumerate(all_worktrees, 1):
8068 current_marker = " 👈 CURRENT" if wt["is_current"] else ""
8069 main_marker = " (main)" if wt["is_main"] else ""
8071 output.append(f"{i}. **{wt['branch']}{main_marker}**{current_marker}")
8072 output.append(f" 📁 {wt['path']}")
8074 status_items = []
8075 if wt["has_session"]:
8076 status_items.append("🧠 session")
8077 if wt["prunable"]:
8078 status_items.append("🗑️ prunable")
8079 if not wt["exists"]:
8080 status_items.append("❌ missing")
8082 if status_items:
8083 output.append(f" Status: {', '.join(status_items)}")
8084 output.append("")
8086 output.append("💡 Use `git_worktree_list` for more details")
8087 output.append(
8088 "💡 Use `git_worktree_add <branch> <path>` to create new worktrees",
8089 )
8091 return "\n".join(output)
8093 except Exception as e:
8094 session_logger.exception(f"git_worktree_status failed: {e}")
8095 return f"❌ Failed to get worktree status: {e}"
8098@mcp.tool()
8099async def git_worktree_prune(working_directory: str | None = None) -> str:
8100 """Prune stale worktree references."""
8101 from .worktree_manager import WorktreeManager
8103 working_dir = Path(working_directory or os.getcwd())
8104 manager = WorktreeManager(session_logger=session_logger)
8106 try:
8107 result = await manager.prune_worktrees(working_dir)
8109 if not result["success"]:
8110 return f"❌ {result['error']}"
8112 output = ["🧹 **Worktree Pruning Complete**\n"]
8114 if result["pruned_count"] > 0:
8115 output.append(
8116 f"🗑️ Pruned {result['pruned_count']} stale worktree references",
8117 )
8118 if result.get("output"):
8119 output.append(f"📝 Details: {result['output']}")
8120 else:
8121 output.append("✅ No stale worktree references found")
8122 output.append("🎉 All worktrees are clean and up to date")
8124 output.append("\n💡 Use `git_worktree_list` to see current worktrees")
8126 return "\n".join(output)
8128 except Exception as e:
8129 session_logger.exception(f"git_worktree_prune failed: {e}")
8130 return f"❌ Failed to prune worktrees: {e}"
8133def main() -> None:
8134 """Main entry point for the MCP server."""
8135 # Initialize new features on startup
8136 import asyncio
8138 try:
8139 asyncio.run(initialize_new_features())
8140 except Exception:
8141 # Silently handle optional feature initialization failures
8142 pass
8144 mcp.run()
8147if __name__ == "__main__":
8148 main()