Coverage for session_mgmt_mcp/context_manager.py: 9.88%
236 statements
« prev ^ index » next coverage.py v7.10.6, created at 2025-09-01 05:22 -0700
« prev ^ index » next coverage.py v7.10.6, created at 2025-09-01 05:22 -0700
1#!/usr/bin/env python3
2"""Auto-Context Loading for Session Management MCP Server.
4Automatically detects current development context and loads relevant conversations.
5"""
7import hashlib
8import json
9import os
10from datetime import datetime, timedelta
11from pathlib import Path
12from typing import Any
14from .reflection_tools import ReflectionDatabase
15from .utils.git_operations import get_worktree_info, list_worktrees
18class ContextDetector:
19 """Detects current development context from environment and files."""
21 def __init__(self) -> None:
22 self.context_indicators = {
23 "git": [".git", ".gitignore", ".github"],
24 "python": ["pyproject.toml", "setup.py", "requirements.txt", "*.py"],
25 "javascript": ["package.json", "node_modules", "*.js", "*.ts"],
26 "rust": ["Cargo.toml", "Cargo.lock", "*.rs"],
27 "go": ["go.mod", "go.sum", "*.go"],
28 "java": ["pom.xml", "build.gradle", "*.java"],
29 "docker": ["Dockerfile", "docker-compose.yml", ".dockerignore"],
30 "web": ["index.html", "*.css", "*.scss"],
31 "testing": ["tests/", "test/", "*test*", "pytest.ini"],
32 "documentation": ["README.md", "docs/", "*.md"],
33 "config": [".env", ".envrc", "config/", "*.ini", "*.yaml", "*.yml"],
34 }
36 self.project_types = {
37 "mcp_server": ["mcp.json", ".mcp.json", "fastmcp"],
38 "api": ["api/", "routes/", "endpoints/"],
39 "web_app": ["templates/", "static/", "public/"],
40 "cli_tool": ["cli/", "commands/", "__main__.py"],
41 "library": ["src/", "lib/", "__init__.py"],
42 "data_science": ["*.ipynb", "data/", "notebooks/"],
43 "ml_project": ["model/", "models/", "training/", "*.pkl"],
44 "devops": ["terraform/", "ansible/", "k8s/", "kubernetes/"],
45 }
47 def _initialize_context(self, working_path: Path) -> dict[str, Any]:
48 """Initialize basic context structure."""
49 return {
50 "working_directory": str(working_path),
51 "project_name": working_path.name,
52 "detected_languages": [],
53 "detected_tools": [],
54 "project_type": None,
55 "current_files": [],
56 "recent_files": [],
57 "git_info": {},
58 "worktree_info": None,
59 "confidence_score": 0.0,
60 }
62 def _find_indicators(self, working_path: Path, indicators: list[str]) -> list[str]:
63 """Find matching indicators in the working directory."""
64 found_indicators = []
66 for indicator in indicators:
67 if indicator.startswith("*"):
68 # Glob pattern
69 matches = list(working_path.glob(indicator))
70 if matches:
71 found_indicators.extend([m.name for m in matches[:3]]) # Limit to 3
72 elif indicator.endswith("/"):
73 # Directory
74 if (working_path / indicator.rstrip("/")).exists():
75 found_indicators.append(indicator)
76 # File
77 elif (working_path / indicator).exists():
78 found_indicators.append(indicator)
80 return found_indicators
82 def _detect_languages_and_tools(
83 self,
84 working_path: Path,
85 context: dict[str, Any],
86 ) -> None:
87 """Detect programming languages and development tools."""
88 for category, indicators in self.context_indicators.items():
89 found_indicators = self._find_indicators(working_path, indicators)
91 if found_indicators:
92 if category in ["python", "javascript", "rust", "go", "java"]:
93 context["detected_languages"].append(category)
94 else:
95 context["detected_tools"].append(category)
96 context["confidence_score"] += 0.1
98 def _calculate_project_type_score(
99 self,
100 working_path: Path,
101 indicators: list[str],
102 ) -> float:
103 """Calculate score for a specific project type."""
104 type_score = 0
106 for indicator in indicators:
107 if indicator.startswith("*"):
108 if list(working_path.glob(indicator)):
109 type_score += 1
110 elif indicator.endswith("/"):
111 if (working_path / indicator.rstrip("/")).exists():
112 type_score += 1
113 elif (working_path / indicator).exists():
114 type_score += 1
115 elif indicator in str(working_path): # Check if it's in path name
116 type_score += 0.5
118 return type_score
120 def _detect_project_type(self, working_path: Path, context: dict[str, Any]) -> None:
121 """Detect the type of project."""
122 best_score = 0
124 for proj_type, indicators in self.project_types.items():
125 type_score = self._calculate_project_type_score(working_path, indicators)
127 if type_score > best_score:
128 context["project_type"] = proj_type
129 best_score = type_score
131 def _get_recent_files(self, working_path: Path) -> list[dict[str, Any]]:
132 """Get recently modified files."""
133 recent_files = []
135 try:
136 recent_threshold = datetime.now() - timedelta(hours=2)
138 for file_path in working_path.rglob("*"):
139 if file_path.is_file() and not self._should_ignore_file(file_path):
140 mod_time = datetime.fromtimestamp(file_path.stat().st_mtime)
142 if mod_time > recent_threshold:
143 recent_files.append(
144 {
145 "path": str(file_path.relative_to(working_path)),
146 "modified": mod_time.isoformat(),
147 "size": file_path.stat().st_size,
148 },
149 )
151 # Sort by modification time and return top 10
152 recent_files.sort(key=lambda x: x["modified"], reverse=True)
153 return recent_files[:10]
155 except (OSError, PermissionError):
156 return []
158 def detect_current_context(self, working_dir: str | None = None) -> dict[str, Any]:
159 """Detect current development context."""
160 if not working_dir:
161 working_dir = os.environ.get("PWD", os.getcwd())
163 working_path = Path(working_dir)
164 context = self._initialize_context(working_path)
166 # Detect languages and tools
167 self._detect_languages_and_tools(working_path, context)
169 # Detect project type
170 self._detect_project_type(working_path, context)
172 # Get recent files
173 context["recent_files"] = self._get_recent_files(working_path)
175 # Get git information
176 context["git_info"] = self._get_git_info(working_path)
178 # Get comprehensive worktree information
179 worktree_info = get_worktree_info(working_path)
180 if worktree_info:
181 context["worktree_info"] = {
182 "path": str(worktree_info.path),
183 "branch": worktree_info.branch,
184 "is_main_worktree": worktree_info.is_main_worktree,
185 "is_detached": worktree_info.is_detached,
186 "is_bare": worktree_info.is_bare,
187 "locked": worktree_info.locked,
188 "prunable": worktree_info.prunable,
189 }
191 # Get list of all worktrees for cross-worktree context
192 all_worktrees = list_worktrees(working_path)
193 context["all_worktrees"] = [
194 {
195 "path": str(wt.path),
196 "branch": wt.branch,
197 "is_main": wt.is_main_worktree,
198 "is_current": wt.path == worktree_info.path,
199 }
200 for wt in all_worktrees
201 ]
203 return context
205 def _should_ignore_file(self, file_path: Path) -> bool:
206 """Check if file should be ignored."""
207 ignore_patterns = {
208 ".git",
209 ".venv",
210 "__pycache__",
211 "node_modules",
212 ".pytest_cache",
213 ".mypy_cache",
214 ".ruff_cache",
215 "dist",
216 "build",
217 ".DS_Store",
218 }
220 # Check if any part of the path matches ignore patterns
221 for part in file_path.parts:
222 if part in ignore_patterns or (part.startswith(".") and len(part) > 4):
223 return True
225 # Check file extensions to ignore
226 ignore_extensions = {".pyc", ".pyo", ".log", ".tmp", ".cache"}
227 return file_path.suffix in ignore_extensions
229 def _get_git_info(self, working_path: Path) -> dict[str, Any]:
230 """Get git repository information."""
231 git_info = {}
233 git_dir = working_path / ".git"
234 if git_dir.exists():
235 try:
236 # Use new worktree-aware detection
237 worktree_info = get_worktree_info(working_path)
238 if worktree_info:
239 git_info["current_branch"] = worktree_info.branch
240 git_info["is_worktree"] = not worktree_info.is_main_worktree
241 git_info["is_detached"] = worktree_info.is_detached
242 git_info["worktree_path"] = str(worktree_info.path)
243 else:
244 # Fallback to old method
245 head_file = git_dir / "HEAD"
246 if head_file.exists():
247 head_content = head_file.read_text().strip()
248 if head_content.startswith("ref: refs/heads/"):
249 git_info["current_branch"] = head_content.split("/")[-1]
251 # Get remote info (simplified)
252 config_file = git_dir / "config"
253 if config_file.exists():
254 config_content = config_file.read_text()
255 if "github.com" in config_content:
256 git_info["platform"] = "github"
257 elif "gitlab.com" in config_content:
258 git_info["platform"] = "gitlab"
259 else:
260 git_info["platform"] = "git"
262 git_info["is_git_repo"] = True
264 except (OSError, PermissionError):
265 pass
267 return git_info
270class RelevanceScorer:
271 """Scores conversation relevance based on context."""
273 def __init__(self) -> None:
274 self.scoring_weights = {
275 "project_name_match": 0.3,
276 "language_match": 0.2,
277 "tool_match": 0.15,
278 "file_match": 0.15,
279 "recency": 0.1,
280 "keyword_match": 0.1,
281 }
283 def _score_project_match(
284 self,
285 conv_content: str,
286 conv_project: str,
287 context: dict[str, Any],
288 ) -> float:
289 """Score based on project name matching."""
290 current_project = context["project_name"].lower()
291 if current_project in conv_project or current_project in conv_content:
292 return self.scoring_weights["project_name_match"]
293 return 0.0
295 def _score_language_match(
296 self,
297 conv_content: str,
298 context: dict[str, Any],
299 ) -> float:
300 """Score based on programming language matching."""
301 score = 0.0
302 for lang in context["detected_languages"]:
303 if lang in conv_content:
304 score += self.scoring_weights["language_match"] / len(
305 context["detected_languages"],
306 )
307 return score
309 def _score_tool_match(self, conv_content: str, context: dict[str, Any]) -> float:
310 """Score based on development tool matching."""
311 score = 0.0
312 for tool in context["detected_tools"]:
313 if tool in conv_content:
314 score += self.scoring_weights["tool_match"] / len(
315 context["detected_tools"],
316 )
317 return score
319 def _score_file_match(self, conv_content: str, context: dict[str, Any]) -> float:
320 """Score based on file name matching."""
321 score = 0.0
322 for file_info in context["recent_files"]:
323 file_name = Path(file_info["path"]).name.lower()
324 if file_name in conv_content:
325 score += self.scoring_weights["file_match"] / len(
326 context["recent_files"],
327 )
328 return score
330 def _score_recency(self, conversation: dict[str, Any]) -> float:
331 """Score based on conversation recency."""
332 try:
333 conv_time = datetime.fromisoformat(conversation.get("timestamp", ""))
334 time_diff = datetime.now() - conv_time
335 if time_diff.days == 0:
336 return self.scoring_weights["recency"]
337 if time_diff.days <= 7:
338 return self.scoring_weights["recency"] * 0.5
339 except (ValueError, TypeError):
340 pass
341 return 0.0
343 def _get_project_keywords(self) -> dict[str, list[str]]:
344 """Get project type keyword mappings."""
345 return {
346 "mcp_server": ["mcp", "server", "fastmcp", "protocol"],
347 "api": ["api", "endpoint", "route", "request", "response"],
348 "web_app": ["web", "app", "frontend", "backend", "html", "css"],
349 "cli_tool": ["cli", "command", "argument", "terminal"],
350 "library": ["library", "package", "module", "import"],
351 "data_science": ["data", "analysis", "pandas", "numpy", "jupyter"],
352 "ml_project": ["machine learning", "model", "training", "neural"],
353 "devops": ["deploy", "infrastructure", "docker", "kubernetes"],
354 }
356 def _score_project_keywords(
357 self,
358 conv_content: str,
359 context: dict[str, Any],
360 ) -> float:
361 """Score based on project type keywords."""
362 if not context.get("project_type"):
363 return 0.0
365 project_keywords = self._get_project_keywords()
366 keywords = project_keywords.get(context["project_type"], [])
368 score = 0.0
369 for keyword in keywords:
370 if keyword in conv_content:
371 score += self.scoring_weights["keyword_match"] / len(keywords)
373 return score
375 def score_conversation_relevance(
376 self,
377 conversation: dict[str, Any],
378 context: dict[str, Any],
379 ) -> float:
380 """Score how relevant a conversation is to current context."""
381 conv_content = conversation.get("content", "").lower()
382 conv_project = conversation.get("project", "").lower()
384 score = 0.0
385 score += self._score_project_match(conv_content, conv_project, context)
386 score += self._score_language_match(conv_content, context)
387 score += self._score_tool_match(conv_content, context)
388 score += self._score_file_match(conv_content, context)
389 score += self._score_recency(conversation)
390 score += self._score_project_keywords(conv_content, context)
392 return min(score, 1.0) # Cap at 1.0
395class AutoContextLoader:
396 """Main class for automatic context loading."""
398 def __init__(self, reflection_db: ReflectionDatabase) -> None:
399 self.reflection_db = reflection_db
400 self.context_detector = ContextDetector()
401 self.relevance_scorer = RelevanceScorer()
402 self.cache = {}
403 self.cache_timeout = 300 # 5 minutes
405 async def load_relevant_context(
406 self,
407 working_dir: str | None = None,
408 max_conversations: int = 10,
409 min_relevance: float = 0.3,
410 ) -> dict[str, Any]:
411 """Load relevant conversations based on current context."""
412 # Detect current context
413 current_context = self.context_detector.detect_current_context(working_dir)
415 # Generate cache key based on context
416 context_hash = self._generate_context_hash(current_context)
418 # Check cache
419 if context_hash in self.cache:
420 cached_time, cached_result = self.cache[context_hash]
421 if datetime.now() - cached_time < timedelta(seconds=self.cache_timeout):
422 return cached_result
424 # Get all conversations from database
425 relevant_conversations = []
427 if hasattr(self.reflection_db, "conn") and self.reflection_db.conn:
428 cursor = self.reflection_db.conn.execute(
429 "SELECT id, content, project, timestamp, metadata FROM conversations",
430 )
431 conversations = cursor.fetchall()
433 for conv in conversations:
434 conv_id, content, project, timestamp, metadata = conv
436 conversation_data = {
437 "id": conv_id,
438 "content": content,
439 "project": project,
440 "timestamp": timestamp,
441 "metadata": json.loads(metadata) if metadata else {},
442 }
444 # Score relevance
445 relevance = self.relevance_scorer.score_conversation_relevance(
446 conversation_data,
447 current_context,
448 )
450 if relevance >= min_relevance:
451 conversation_data["relevance_score"] = relevance
452 relevant_conversations.append(conversation_data)
454 # Sort by relevance and limit results
455 relevant_conversations.sort(key=lambda x: x["relevance_score"], reverse=True)
456 top_conversations = relevant_conversations[:max_conversations]
458 result = {
459 "context": current_context,
460 "relevant_conversations": top_conversations,
461 "total_found": len(relevant_conversations),
462 "loaded_count": len(top_conversations),
463 "min_relevance_threshold": min_relevance,
464 }
466 # Cache result
467 self.cache[context_hash] = (datetime.now(), result)
469 return result
471 def _generate_context_hash(self, context: dict[str, Any]) -> str:
472 """Generate hash for context caching."""
473 # Use key context elements for hashing
474 hash_data = {
475 "project_name": context["project_name"],
476 "detected_languages": sorted(context["detected_languages"]),
477 "detected_tools": sorted(context["detected_tools"]),
478 "project_type": context.get("project_type"),
479 "working_directory": context["working_directory"],
480 }
482 hash_string = json.dumps(hash_data, sort_keys=True)
483 return hashlib.md5(hash_string.encode()).hexdigest()[:12]
485 async def get_context_summary(self, working_dir: str | None = None) -> str:
486 """Get a human-readable summary of current context."""
487 context = self.context_detector.detect_current_context(working_dir)
489 summary_parts = []
490 summary_parts.append(f"📁 Project: {context['project_name']}")
491 summary_parts.append(f"📂 Directory: {context['working_directory']}")
493 if context["detected_languages"]:
494 langs = ", ".join(context["detected_languages"])
495 summary_parts.append(f"💻 Languages: {langs}")
497 if context["detected_tools"]:
498 tools = ", ".join(context["detected_tools"])
499 summary_parts.append(f"🔧 Tools: {tools}")
501 if context["project_type"]:
502 summary_parts.append(
503 f"📋 Type: {context['project_type'].replace('_', ' ').title()}",
504 )
506 if context["git_info"].get("is_git_repo"):
507 git_info = context["git_info"]
508 branch = git_info.get("current_branch", "unknown")
509 platform = git_info.get("platform", "git")
510 summary_parts.append(f"🌿 Git: {branch} branch on {platform}")
512 if context["recent_files"]:
513 count = len(context["recent_files"])
514 summary_parts.append(f"📄 Recent files: {count} modified in last 2 hours")
516 confidence = context["confidence_score"] * 100
517 summary_parts.append(f"🎯 Detection confidence: {confidence:.0f}%")
519 return "\n".join(summary_parts)