Coverage for session_buddy / context_manager.py: 94.64%
265 statements
« prev ^ index » next coverage.py v7.13.1, created at 2026-01-04 00:43 -0800
« prev ^ index » next coverage.py v7.13.1, created at 2026-01-04 00:43 -0800
1#!/usr/bin/env python3
2"""Auto-Context Loading for Session Management MCP Server.
4Automatically detects current development context and loads relevant conversations.
5"""
7import hashlib
8import json
9import operator
10import os
11from datetime import datetime, timedelta
12from pathlib import Path
13from typing import Any
15from .reflection_tools import ReflectionDatabase
16from .utils.git_operations import get_worktree_info, list_worktrees
19class ContextDetector:
20 """Detects current development context from environment and files."""
22 def __init__(self) -> None:
23 self.context_indicators = {
24 "git": [".git", ".gitignore", ".github"],
25 "python": ["pyproject.toml", "setup.py", "requirements.txt", "*.py"],
26 "javascript": ["package.json", "node_modules", "*.js", "*.ts"],
27 "rust": ["Cargo.toml", "Cargo.lock", "*.rs"],
28 "go": ["go.mod", "go.sum", "*.go"],
29 "java": ["pom.xml", "build.gradle", "*.java"],
30 "docker": ["Dockerfile", "docker-compose.yml", ".dockerignore"],
31 "web": ["index.html", "*.css", "*.scss"],
32 "testing": ["tests/", "test/", "*test*", "pytest.ini"],
33 "documentation": ["README.md", "docs/", "*.md"],
34 "config": [".env", ".envrc", "config/", "*.ini", "*.yaml", "*.yml"],
35 }
37 self.project_types = {
38 "mcp_server": ["mcp.json", ".mcp.json", "fastmcp"],
39 "api": ["api/", "routes/", "endpoints/"],
40 "web_app": ["templates/", "static/", "public/"],
41 "cli_tool": ["cli/", "commands/", "__main__.py"],
42 "library": ["src/", "lib/", "__init__.py"],
43 "data_science": ["*.ipynb", "data/", "notebooks/"],
44 "ml_project": ["model/", "models/", "training/", "*.pkl"],
45 "devops": ["terraform/", "ansible/", "k8s/", "kubernetes/"],
46 }
48 def _initialize_context(self, working_path: Path) -> dict[str, Any]:
49 """Initialize basic context structure."""
50 return {
51 "working_directory": str(working_path),
52 "project_name": working_path.name,
53 "detected_languages": [],
54 "detected_tools": [],
55 "project_type": None,
56 "current_files": [],
57 "recent_files": [],
58 "git_info": {},
59 "worktree_info": None,
60 "confidence_score": 0.0,
61 }
63 def _find_indicators(self, working_path: Path, indicators: list[str]) -> list[str]:
64 """Find matching indicators in the working directory."""
65 found_indicators = []
67 for indicator in indicators:
68 if indicator.startswith("*"):
69 # Glob pattern
70 matches = list(working_path.glob(indicator))
71 if matches:
72 found_indicators.extend([m.name for m in matches[:3]]) # Limit to 3
73 elif indicator.endswith("/"):
74 # Directory
75 if (working_path / indicator.rstrip("/")).exists():
76 found_indicators.append(indicator)
77 # File
78 elif (working_path / indicator).exists():
79 found_indicators.append(indicator)
81 return found_indicators
83 def _detect_languages_and_tools(
84 self,
85 working_path: Path,
86 context: dict[str, Any],
87 ) -> None:
88 """Detect programming languages and development tools."""
89 for category, indicators in self.context_indicators.items():
90 found_indicators = self._find_indicators(working_path, indicators)
92 if found_indicators:
93 if category in {"python", "javascript", "rust", "go", "java"}:
94 context["detected_languages"].append(category)
95 else:
96 context["detected_tools"].append(category)
97 context["confidence_score"] += 0.1
99 def _calculate_project_type_score(
100 self,
101 working_path: Path,
102 indicators: list[str],
103 ) -> float:
104 """Calculate score for a specific project type."""
105 type_score = 0.0
107 for indicator in indicators:
108 if indicator.startswith("*"):
109 if list(working_path.glob(indicator)): 109 ↛ 110line 109 didn't jump to line 110 because the condition on line 109 was never true
110 type_score += 1
111 elif indicator.endswith("/"):
112 if (working_path / indicator.rstrip("/")).exists():
113 type_score += 1
114 elif (working_path / indicator).exists():
115 type_score += 1
116 elif indicator in str(working_path): # Check if it's in path name 116 ↛ 117line 116 didn't jump to line 117 because the condition on line 116 was never true
117 type_score += 0.5
119 return type_score
121 def _detect_project_type(self, working_path: Path, context: dict[str, Any]) -> None:
122 """Detect the type of project."""
123 best_score = 0.0
125 for proj_type, indicators in self.project_types.items():
126 type_score = self._calculate_project_type_score(working_path, indicators)
128 if type_score > best_score:
129 context["project_type"] = proj_type
130 best_score = type_score
132 def _get_recent_files(self, working_path: Path) -> list[dict[str, Any]]:
133 """Get recently modified files."""
134 recent_files = []
136 try:
137 recent_threshold = datetime.now() - timedelta(hours=2)
139 for file_path in working_path.rglob("*"):
140 if file_path.is_file() and not self._should_ignore_file(file_path):
141 mod_time = datetime.fromtimestamp(file_path.stat().st_mtime)
143 if mod_time > recent_threshold: 143 ↛ 139line 143 didn't jump to line 139 because the condition on line 143 was always true
144 recent_files.append(
145 {
146 "path": str(file_path.relative_to(working_path)),
147 "modified": mod_time.isoformat(),
148 "size": file_path.stat().st_size,
149 },
150 )
152 # Sort by modification time and return top 10
153 recent_files.sort(key=lambda x: str(x["modified"]), reverse=True)
154 return recent_files[:10]
156 except (OSError, PermissionError):
157 return []
159 def detect_current_context(self, working_dir: str | None = None) -> dict[str, Any]:
160 """Detect current development context."""
161 working_path = self._resolve_working_path(working_dir)
162 context = self._initialize_context(working_path)
164 self._gather_project_context(working_path, context)
165 self._gather_git_context(working_path, context)
167 return context
169 def _resolve_working_path(self, working_dir: str | None) -> Path:
170 """Resolve the working directory path."""
171 if not working_dir:
172 try:
173 cwd = Path.cwd()
174 except FileNotFoundError:
175 cwd = Path.home()
176 working_dir = os.environ.get("PWD", str(cwd))
177 return Path(working_dir) if working_dir else Path.home()
179 def _gather_project_context(
180 self,
181 working_path: Path,
182 context: dict[str, Any],
183 ) -> None:
184 """Gather project-specific context information."""
185 self._detect_languages_and_tools(working_path, context)
186 self._detect_project_type(working_path, context)
187 context["recent_files"] = self._get_recent_files(working_path)
189 def _gather_git_context(self, working_path: Path, context: dict[str, Any]) -> None:
190 """Gather Git and worktree context information."""
191 context["git_info"] = self._get_git_info(working_path)
192 self._add_worktree_context(working_path, context)
194 def _add_worktree_context(
195 self,
196 working_path: Path,
197 context: dict[str, Any],
198 ) -> None:
199 """Add worktree information to context."""
200 worktree_info = get_worktree_info(working_path)
201 if worktree_info: 201 ↛ 202line 201 didn't jump to line 202 because the condition on line 201 was never true
202 context["worktree_info"] = self._format_worktree_info(worktree_info)
203 context["all_worktrees"] = self._get_all_worktrees_info(
204 working_path,
205 worktree_info,
206 )
208 def _format_worktree_info(self, worktree_info: Any) -> dict[str, Any]:
209 """Format worktree information for context."""
210 return {
211 "path": str(worktree_info.path),
212 "branch": worktree_info.branch,
213 "is_main_worktree": worktree_info.is_main_worktree,
214 "is_detached": worktree_info.is_detached,
215 "is_bare": worktree_info.is_bare,
216 "locked": worktree_info.locked,
217 "prunable": worktree_info.prunable,
218 }
220 def _get_all_worktrees_info(
221 self,
222 working_path: Path,
223 current_worktree: Any,
224 ) -> list[dict[str, Any]]:
225 """Get information about all worktrees."""
226 all_worktrees = list_worktrees(working_path)
227 return [
228 {
229 "path": str(wt.path),
230 "branch": wt.branch,
231 "is_main": wt.is_main_worktree,
232 "is_current": wt.path == current_worktree.path,
233 }
234 for wt in all_worktrees
235 ]
237 def _should_ignore_file(self, file_path: Path) -> bool:
238 """Check if file should be ignored."""
239 ignore_patterns = {
240 ".git",
241 ".venv",
242 "__pycache__",
243 "node_modules",
244 ".pytest_cache",
245 ".mypy_cache",
246 ".ruff_cache",
247 "dist",
248 "build",
249 ".DS_Store",
250 }
252 # Check if any part of the path matches ignore patterns
253 for part in file_path.parts:
254 if part in ignore_patterns or (part.startswith(".") and len(part) > 4):
255 return True
257 # Check file extensions to ignore
258 ignore_extensions = {".pyc", ".pyo", ".log", ".tmp", ".cache"}
259 return file_path.suffix in ignore_extensions
261 def _get_git_info(self, working_path: Path) -> dict[str, Any]:
262 """Get git repository information."""
263 git_dir = working_path / ".git"
264 if not git_dir.exists():
265 return {}
267 from contextlib import suppress
269 git_info: dict[str, Any] = {}
270 with suppress(OSError, PermissionError):
271 self._extract_branch_info(git_dir, git_info, working_path)
272 self._extract_platform_info(git_dir, git_info)
273 git_info["is_git_repo"] = "True"
275 return git_info
277 def _extract_branch_info(
278 self,
279 git_dir: Path,
280 git_info: dict[str, Any],
281 working_path: Path,
282 ) -> None:
283 """Extract git branch information using worktree-aware detection."""
284 worktree_info = get_worktree_info(working_path)
285 if worktree_info: 285 ↛ 286line 285 didn't jump to line 286 because the condition on line 285 was never true
286 self._populate_worktree_info(git_info, worktree_info)
287 else:
288 self._fallback_branch_detection(git_dir, git_info)
290 def _populate_worktree_info(
291 self,
292 git_info: dict[str, Any],
293 worktree_info: Any,
294 ) -> None:
295 """Populate git info from worktree information."""
296 git_info["current_branch"] = worktree_info.branch
297 git_info["is_worktree"] = str(not worktree_info.is_main_worktree)
298 git_info["is_detached"] = str(worktree_info.is_detached)
299 git_info["worktree_path"] = str(worktree_info.path)
301 def _fallback_branch_detection(
302 self,
303 git_dir: Path,
304 git_info: dict[str, Any],
305 ) -> None:
306 """Fallback method for branch detection when worktree info unavailable."""
307 head_file = git_dir / "HEAD"
308 if not head_file.exists():
309 return
311 head_content = head_file.read_text().strip()
312 if head_content.startswith("ref: refs/heads/"): 312 ↛ exitline 312 didn't return from function '_fallback_branch_detection' because the condition on line 312 was always true
313 git_info["current_branch"] = head_content.split("/")[-1]
315 def _extract_platform_info(self, git_dir: Path, git_info: dict[str, Any]) -> None:
316 """Extract git platform information from config."""
317 config_file = git_dir / "config"
318 if not config_file.exists(): 318 ↛ 319line 318 didn't jump to line 319 because the condition on line 318 was never true
319 return
321 config_content = config_file.read_text()
322 git_info["platform"] = self._determine_git_platform(config_content)
324 def _determine_git_platform(self, config_content: str) -> str:
325 """Determine git platform from config content."""
326 if "github.com" in config_content:
327 return "github"
328 if "gitlab.com" in config_content:
329 return "gitlab"
330 return "git"
333class RelevanceScorer:
334 """Scores conversation relevance based on context."""
336 def __init__(self) -> None:
337 self.scoring_weights = {
338 "project_name_match": 0.3,
339 "language_match": 0.2,
340 "tool_match": 0.15,
341 "file_match": 0.15,
342 "recency": 0.1,
343 "keyword_match": 0.1,
344 }
346 def _score_project_match(
347 self,
348 conv_content: str,
349 conv_project: str,
350 context: dict[str, Any],
351 ) -> float:
352 """Score based on project name matching."""
353 current_project = context["project_name"].lower()
354 if current_project in conv_project or current_project in conv_content:
355 return self.scoring_weights["project_name_match"]
356 return 0.0
358 def _score_language_match(
359 self,
360 conv_content: str,
361 context: dict[str, Any],
362 ) -> float:
363 """Score based on programming language matching."""
364 score = 0.0
365 for lang in context["detected_languages"]:
366 if lang in conv_content:
367 score += self.scoring_weights["language_match"] / len(
368 context["detected_languages"],
369 )
370 return score
372 def _score_tool_match(self, conv_content: str, context: dict[str, Any]) -> float:
373 """Score based on development tool matching."""
374 score = 0.0
375 for tool in context["detected_tools"]:
376 if tool in conv_content:
377 score += self.scoring_weights["tool_match"] / len(
378 context["detected_tools"],
379 )
380 return score
382 def _score_file_match(self, conv_content: str, context: dict[str, Any]) -> float:
383 """Score based on file name matching."""
384 score = 0.0
385 for file_info in context["recent_files"]:
386 file_name = Path(file_info["path"]).name.lower()
387 if file_name in conv_content:
388 score += self.scoring_weights["file_match"] / len(
389 context["recent_files"],
390 )
391 return score
393 def _score_recency(self, conversation: dict[str, Any]) -> float:
394 """Score based on conversation recency."""
395 from contextlib import suppress
397 with suppress(ValueError, TypeError):
398 conv_time = datetime.fromisoformat(conversation.get("timestamp", ""))
399 time_diff = datetime.now() - conv_time
400 if time_diff.days == 0:
401 return self.scoring_weights["recency"]
402 if time_diff.days <= 7:
403 return self.scoring_weights["recency"] * 0.5
404 return 0.0
406 def _get_project_keywords(self) -> dict[str, list[str]]:
407 """Get project type keyword mappings."""
408 return {
409 "mcp_server": ["mcp", "server", "fastmcp", "protocol"],
410 "api": ["api", "endpoint", "route", "request", "response"],
411 "web_app": ["web", "app", "frontend", "backend", "html", "css"],
412 "cli_tool": ["cli", "command", "argument", "terminal"],
413 "library": ["library", "package", "module", "import"],
414 "data_science": ["data", "analysis", "pandas", "numpy", "jupyter"],
415 "ml_project": ["machine learning", "model", "training", "neural"],
416 "devops": ["deploy", "infrastructure", "docker", "kubernetes"],
417 }
419 def _score_project_keywords(
420 self,
421 conv_content: str,
422 context: dict[str, Any],
423 ) -> float:
424 """Score based on project type keywords."""
425 if not context.get("project_type"):
426 return 0.0
428 project_keywords = self._get_project_keywords()
429 keywords = project_keywords.get(context["project_type"], [])
431 score = 0.0
432 for keyword in keywords:
433 if keyword in conv_content:
434 score += self.scoring_weights["keyword_match"] / len(keywords)
436 return score
438 def score_conversation_relevance(
439 self,
440 conversation: dict[str, Any],
441 context: dict[str, Any],
442 ) -> float:
443 """Score how relevant a conversation is to current context."""
444 conv_content = conversation.get("content", "").lower()
445 conv_project = conversation.get("project", "").lower()
447 score = 0.0
448 score += self._score_project_match(conv_content, conv_project, context)
449 score += self._score_language_match(conv_content, context)
450 score += self._score_tool_match(conv_content, context)
451 score += self._score_file_match(conv_content, context)
452 score += self._score_recency(conversation)
453 score += self._score_project_keywords(conv_content, context)
455 return min(score, 1.0) # Cap at 1.0
458class AutoContextLoader:
459 """Main class for automatic context loading."""
461 def __init__(self, reflection_db: ReflectionDatabase) -> None:
462 self.reflection_db = reflection_db
463 self.context_detector = ContextDetector()
464 self.relevance_scorer = RelevanceScorer()
465 self.cache: dict[str, Any] = {}
466 self.cache_timeout = 300 # 5 minutes
468 async def load_relevant_context(
469 self,
470 working_dir: str | None = None,
471 max_conversations: int = 10,
472 min_relevance: float = 0.3,
473 ) -> dict[str, Any]:
474 """Load relevant conversations based on current context."""
475 # Detect current context
476 current_context = self.context_detector.detect_current_context(working_dir)
478 # Generate cache key based on context
479 context_hash = self._generate_context_hash(current_context)
481 # Check cache
482 if context_hash in self.cache:
483 cached_time, cached_result = self.cache[context_hash]
484 if datetime.now() - cached_time < timedelta(seconds=self.cache_timeout): 484 ↛ 488line 484 didn't jump to line 488 because the condition on line 484 was always true
485 return cached_result # type: ignore[no-any-return]
487 # Get all conversations from database
488 relevant_conversations = []
490 if hasattr(self.reflection_db, "conn") and self.reflection_db.conn:
491 cursor = self.reflection_db.conn.execute(
492 "SELECT id, content, project, timestamp, metadata FROM conversations",
493 )
494 conversations = cursor.fetchall()
496 for conv in conversations:
497 conv_id, content, project, timestamp, metadata = conv
499 conversation_data = {
500 "id": conv_id,
501 "content": content,
502 "project": project,
503 "timestamp": timestamp,
504 "metadata": json.loads(metadata) if metadata else {},
505 }
507 # Score relevance
508 relevance = self.relevance_scorer.score_conversation_relevance(
509 conversation_data,
510 current_context,
511 )
513 if relevance >= min_relevance:
514 conversation_data["relevance_score"] = relevance
515 relevant_conversations.append(conversation_data)
517 # Sort by relevance and limit results
518 relevant_conversations.sort(
519 key=operator.itemgetter("relevance_score"), reverse=True
520 )
521 top_conversations = relevant_conversations[:max_conversations]
523 result = {
524 "context": current_context,
525 "relevant_conversations": top_conversations,
526 "total_found": len(relevant_conversations),
527 "loaded_count": len(top_conversations),
528 "min_relevance_threshold": min_relevance,
529 }
531 # Cache result
532 self.cache[context_hash] = (datetime.now(), result)
534 return result
536 def _generate_context_hash(self, context: dict[str, Any]) -> str:
537 """Generate hash for context caching."""
538 # Use key context elements for hashing
539 hash_data = {
540 "project_name": context["project_name"],
541 "detected_languages": sorted(context["detected_languages"]),
542 "detected_tools": sorted(context["detected_tools"]),
543 "project_type": context.get("project_type"),
544 "working_directory": context["working_directory"],
545 }
547 hash_string = json.dumps(hash_data, sort_keys=True)
548 return hashlib.md5(hash_string.encode(), usedforsecurity=False).hexdigest()[:12]
550 async def get_context_summary(self, working_dir: str | None = None) -> str:
551 """Get a human-readable summary of current context."""
552 context = self.context_detector.detect_current_context(working_dir)
554 summary_parts = []
555 summary_parts.extend(
556 (
557 f"📁 Project: {context['project_name']}",
558 f"📂 Directory: {context['working_directory']}",
559 )
560 )
562 if context["detected_languages"]:
563 langs = ", ".join(context["detected_languages"])
564 summary_parts.append(f"💻 Languages: {langs}")
566 if context["detected_tools"]:
567 tools = ", ".join(context["detected_tools"])
568 summary_parts.append(f"🔧 Tools: {tools}")
570 if context["project_type"]: 570 ↛ 571line 570 didn't jump to line 571 because the condition on line 570 was never true
571 summary_parts.append(
572 f"📋 Type: {context['project_type'].replace('_', ' ').title()}",
573 )
575 if context["git_info"].get("is_git_repo"):
576 git_info = context["git_info"]
577 branch = git_info.get("current_branch", "unknown")
578 platform = git_info.get("platform", "git")
579 summary_parts.append(f"🌿 Git: {branch} branch on {platform}")
581 if context["recent_files"]:
582 count = len(context["recent_files"])
583 summary_parts.append(f"📄 Recent files: {count} modified in last 2 hours")
585 confidence = context["confidence_score"] * 100
586 summary_parts.append(f"🎯 Detection confidence: {confidence:.0f}%")
588 return "\n".join(summary_parts)