Coverage for session_mgmt_mcp/context_manager.py: 9.88%

236 statements  

« prev     ^ index     » next       coverage.py v7.10.6, created at 2025-09-01 05:22 -0700

1#!/usr/bin/env python3 

2"""Auto-Context Loading for Session Management MCP Server. 

3 

4Automatically detects current development context and loads relevant conversations. 

5""" 

6 

7import hashlib 

8import json 

9import os 

10from datetime import datetime, timedelta 

11from pathlib import Path 

12from typing import Any 

13 

14from .reflection_tools import ReflectionDatabase 

15from .utils.git_operations import get_worktree_info, list_worktrees 

16 

17 

18class ContextDetector: 

19 """Detects current development context from environment and files.""" 

20 

21 def __init__(self) -> None: 

22 self.context_indicators = { 

23 "git": [".git", ".gitignore", ".github"], 

24 "python": ["pyproject.toml", "setup.py", "requirements.txt", "*.py"], 

25 "javascript": ["package.json", "node_modules", "*.js", "*.ts"], 

26 "rust": ["Cargo.toml", "Cargo.lock", "*.rs"], 

27 "go": ["go.mod", "go.sum", "*.go"], 

28 "java": ["pom.xml", "build.gradle", "*.java"], 

29 "docker": ["Dockerfile", "docker-compose.yml", ".dockerignore"], 

30 "web": ["index.html", "*.css", "*.scss"], 

31 "testing": ["tests/", "test/", "*test*", "pytest.ini"], 

32 "documentation": ["README.md", "docs/", "*.md"], 

33 "config": [".env", ".envrc", "config/", "*.ini", "*.yaml", "*.yml"], 

34 } 

35 

36 self.project_types = { 

37 "mcp_server": ["mcp.json", ".mcp.json", "fastmcp"], 

38 "api": ["api/", "routes/", "endpoints/"], 

39 "web_app": ["templates/", "static/", "public/"], 

40 "cli_tool": ["cli/", "commands/", "__main__.py"], 

41 "library": ["src/", "lib/", "__init__.py"], 

42 "data_science": ["*.ipynb", "data/", "notebooks/"], 

43 "ml_project": ["model/", "models/", "training/", "*.pkl"], 

44 "devops": ["terraform/", "ansible/", "k8s/", "kubernetes/"], 

45 } 

46 

47 def _initialize_context(self, working_path: Path) -> dict[str, Any]: 

48 """Initialize basic context structure.""" 

49 return { 

50 "working_directory": str(working_path), 

51 "project_name": working_path.name, 

52 "detected_languages": [], 

53 "detected_tools": [], 

54 "project_type": None, 

55 "current_files": [], 

56 "recent_files": [], 

57 "git_info": {}, 

58 "worktree_info": None, 

59 "confidence_score": 0.0, 

60 } 

61 

62 def _find_indicators(self, working_path: Path, indicators: list[str]) -> list[str]: 

63 """Find matching indicators in the working directory.""" 

64 found_indicators = [] 

65 

66 for indicator in indicators: 

67 if indicator.startswith("*"): 

68 # Glob pattern 

69 matches = list(working_path.glob(indicator)) 

70 if matches: 

71 found_indicators.extend([m.name for m in matches[:3]]) # Limit to 3 

72 elif indicator.endswith("/"): 

73 # Directory 

74 if (working_path / indicator.rstrip("/")).exists(): 

75 found_indicators.append(indicator) 

76 # File 

77 elif (working_path / indicator).exists(): 

78 found_indicators.append(indicator) 

79 

80 return found_indicators 

81 

82 def _detect_languages_and_tools( 

83 self, 

84 working_path: Path, 

85 context: dict[str, Any], 

86 ) -> None: 

87 """Detect programming languages and development tools.""" 

88 for category, indicators in self.context_indicators.items(): 

89 found_indicators = self._find_indicators(working_path, indicators) 

90 

91 if found_indicators: 

92 if category in ["python", "javascript", "rust", "go", "java"]: 

93 context["detected_languages"].append(category) 

94 else: 

95 context["detected_tools"].append(category) 

96 context["confidence_score"] += 0.1 

97 

98 def _calculate_project_type_score( 

99 self, 

100 working_path: Path, 

101 indicators: list[str], 

102 ) -> float: 

103 """Calculate score for a specific project type.""" 

104 type_score = 0 

105 

106 for indicator in indicators: 

107 if indicator.startswith("*"): 

108 if list(working_path.glob(indicator)): 

109 type_score += 1 

110 elif indicator.endswith("/"): 

111 if (working_path / indicator.rstrip("/")).exists(): 

112 type_score += 1 

113 elif (working_path / indicator).exists(): 

114 type_score += 1 

115 elif indicator in str(working_path): # Check if it's in path name 

116 type_score += 0.5 

117 

118 return type_score 

119 

120 def _detect_project_type(self, working_path: Path, context: dict[str, Any]) -> None: 

121 """Detect the type of project.""" 

122 best_score = 0 

123 

124 for proj_type, indicators in self.project_types.items(): 

125 type_score = self._calculate_project_type_score(working_path, indicators) 

126 

127 if type_score > best_score: 

128 context["project_type"] = proj_type 

129 best_score = type_score 

130 

131 def _get_recent_files(self, working_path: Path) -> list[dict[str, Any]]: 

132 """Get recently modified files.""" 

133 recent_files = [] 

134 

135 try: 

136 recent_threshold = datetime.now() - timedelta(hours=2) 

137 

138 for file_path in working_path.rglob("*"): 

139 if file_path.is_file() and not self._should_ignore_file(file_path): 

140 mod_time = datetime.fromtimestamp(file_path.stat().st_mtime) 

141 

142 if mod_time > recent_threshold: 

143 recent_files.append( 

144 { 

145 "path": str(file_path.relative_to(working_path)), 

146 "modified": mod_time.isoformat(), 

147 "size": file_path.stat().st_size, 

148 }, 

149 ) 

150 

151 # Sort by modification time and return top 10 

152 recent_files.sort(key=lambda x: x["modified"], reverse=True) 

153 return recent_files[:10] 

154 

155 except (OSError, PermissionError): 

156 return [] 

157 

158 def detect_current_context(self, working_dir: str | None = None) -> dict[str, Any]: 

159 """Detect current development context.""" 

160 if not working_dir: 

161 working_dir = os.environ.get("PWD", os.getcwd()) 

162 

163 working_path = Path(working_dir) 

164 context = self._initialize_context(working_path) 

165 

166 # Detect languages and tools 

167 self._detect_languages_and_tools(working_path, context) 

168 

169 # Detect project type 

170 self._detect_project_type(working_path, context) 

171 

172 # Get recent files 

173 context["recent_files"] = self._get_recent_files(working_path) 

174 

175 # Get git information 

176 context["git_info"] = self._get_git_info(working_path) 

177 

178 # Get comprehensive worktree information 

179 worktree_info = get_worktree_info(working_path) 

180 if worktree_info: 

181 context["worktree_info"] = { 

182 "path": str(worktree_info.path), 

183 "branch": worktree_info.branch, 

184 "is_main_worktree": worktree_info.is_main_worktree, 

185 "is_detached": worktree_info.is_detached, 

186 "is_bare": worktree_info.is_bare, 

187 "locked": worktree_info.locked, 

188 "prunable": worktree_info.prunable, 

189 } 

190 

191 # Get list of all worktrees for cross-worktree context 

192 all_worktrees = list_worktrees(working_path) 

193 context["all_worktrees"] = [ 

194 { 

195 "path": str(wt.path), 

196 "branch": wt.branch, 

197 "is_main": wt.is_main_worktree, 

198 "is_current": wt.path == worktree_info.path, 

199 } 

200 for wt in all_worktrees 

201 ] 

202 

203 return context 

204 

205 def _should_ignore_file(self, file_path: Path) -> bool: 

206 """Check if file should be ignored.""" 

207 ignore_patterns = { 

208 ".git", 

209 ".venv", 

210 "__pycache__", 

211 "node_modules", 

212 ".pytest_cache", 

213 ".mypy_cache", 

214 ".ruff_cache", 

215 "dist", 

216 "build", 

217 ".DS_Store", 

218 } 

219 

220 # Check if any part of the path matches ignore patterns 

221 for part in file_path.parts: 

222 if part in ignore_patterns or (part.startswith(".") and len(part) > 4): 

223 return True 

224 

225 # Check file extensions to ignore 

226 ignore_extensions = {".pyc", ".pyo", ".log", ".tmp", ".cache"} 

227 return file_path.suffix in ignore_extensions 

228 

229 def _get_git_info(self, working_path: Path) -> dict[str, Any]: 

230 """Get git repository information.""" 

231 git_info = {} 

232 

233 git_dir = working_path / ".git" 

234 if git_dir.exists(): 

235 try: 

236 # Use new worktree-aware detection 

237 worktree_info = get_worktree_info(working_path) 

238 if worktree_info: 

239 git_info["current_branch"] = worktree_info.branch 

240 git_info["is_worktree"] = not worktree_info.is_main_worktree 

241 git_info["is_detached"] = worktree_info.is_detached 

242 git_info["worktree_path"] = str(worktree_info.path) 

243 else: 

244 # Fallback to old method 

245 head_file = git_dir / "HEAD" 

246 if head_file.exists(): 

247 head_content = head_file.read_text().strip() 

248 if head_content.startswith("ref: refs/heads/"): 

249 git_info["current_branch"] = head_content.split("/")[-1] 

250 

251 # Get remote info (simplified) 

252 config_file = git_dir / "config" 

253 if config_file.exists(): 

254 config_content = config_file.read_text() 

255 if "github.com" in config_content: 

256 git_info["platform"] = "github" 

257 elif "gitlab.com" in config_content: 

258 git_info["platform"] = "gitlab" 

259 else: 

260 git_info["platform"] = "git" 

261 

262 git_info["is_git_repo"] = True 

263 

264 except (OSError, PermissionError): 

265 pass 

266 

267 return git_info 

268 

269 

270class RelevanceScorer: 

271 """Scores conversation relevance based on context.""" 

272 

273 def __init__(self) -> None: 

274 self.scoring_weights = { 

275 "project_name_match": 0.3, 

276 "language_match": 0.2, 

277 "tool_match": 0.15, 

278 "file_match": 0.15, 

279 "recency": 0.1, 

280 "keyword_match": 0.1, 

281 } 

282 

283 def _score_project_match( 

284 self, 

285 conv_content: str, 

286 conv_project: str, 

287 context: dict[str, Any], 

288 ) -> float: 

289 """Score based on project name matching.""" 

290 current_project = context["project_name"].lower() 

291 if current_project in conv_project or current_project in conv_content: 

292 return self.scoring_weights["project_name_match"] 

293 return 0.0 

294 

295 def _score_language_match( 

296 self, 

297 conv_content: str, 

298 context: dict[str, Any], 

299 ) -> float: 

300 """Score based on programming language matching.""" 

301 score = 0.0 

302 for lang in context["detected_languages"]: 

303 if lang in conv_content: 

304 score += self.scoring_weights["language_match"] / len( 

305 context["detected_languages"], 

306 ) 

307 return score 

308 

309 def _score_tool_match(self, conv_content: str, context: dict[str, Any]) -> float: 

310 """Score based on development tool matching.""" 

311 score = 0.0 

312 for tool in context["detected_tools"]: 

313 if tool in conv_content: 

314 score += self.scoring_weights["tool_match"] / len( 

315 context["detected_tools"], 

316 ) 

317 return score 

318 

319 def _score_file_match(self, conv_content: str, context: dict[str, Any]) -> float: 

320 """Score based on file name matching.""" 

321 score = 0.0 

322 for file_info in context["recent_files"]: 

323 file_name = Path(file_info["path"]).name.lower() 

324 if file_name in conv_content: 

325 score += self.scoring_weights["file_match"] / len( 

326 context["recent_files"], 

327 ) 

328 return score 

329 

330 def _score_recency(self, conversation: dict[str, Any]) -> float: 

331 """Score based on conversation recency.""" 

332 try: 

333 conv_time = datetime.fromisoformat(conversation.get("timestamp", "")) 

334 time_diff = datetime.now() - conv_time 

335 if time_diff.days == 0: 

336 return self.scoring_weights["recency"] 

337 if time_diff.days <= 7: 

338 return self.scoring_weights["recency"] * 0.5 

339 except (ValueError, TypeError): 

340 pass 

341 return 0.0 

342 

343 def _get_project_keywords(self) -> dict[str, list[str]]: 

344 """Get project type keyword mappings.""" 

345 return { 

346 "mcp_server": ["mcp", "server", "fastmcp", "protocol"], 

347 "api": ["api", "endpoint", "route", "request", "response"], 

348 "web_app": ["web", "app", "frontend", "backend", "html", "css"], 

349 "cli_tool": ["cli", "command", "argument", "terminal"], 

350 "library": ["library", "package", "module", "import"], 

351 "data_science": ["data", "analysis", "pandas", "numpy", "jupyter"], 

352 "ml_project": ["machine learning", "model", "training", "neural"], 

353 "devops": ["deploy", "infrastructure", "docker", "kubernetes"], 

354 } 

355 

356 def _score_project_keywords( 

357 self, 

358 conv_content: str, 

359 context: dict[str, Any], 

360 ) -> float: 

361 """Score based on project type keywords.""" 

362 if not context.get("project_type"): 

363 return 0.0 

364 

365 project_keywords = self._get_project_keywords() 

366 keywords = project_keywords.get(context["project_type"], []) 

367 

368 score = 0.0 

369 for keyword in keywords: 

370 if keyword in conv_content: 

371 score += self.scoring_weights["keyword_match"] / len(keywords) 

372 

373 return score 

374 

375 def score_conversation_relevance( 

376 self, 

377 conversation: dict[str, Any], 

378 context: dict[str, Any], 

379 ) -> float: 

380 """Score how relevant a conversation is to current context.""" 

381 conv_content = conversation.get("content", "").lower() 

382 conv_project = conversation.get("project", "").lower() 

383 

384 score = 0.0 

385 score += self._score_project_match(conv_content, conv_project, context) 

386 score += self._score_language_match(conv_content, context) 

387 score += self._score_tool_match(conv_content, context) 

388 score += self._score_file_match(conv_content, context) 

389 score += self._score_recency(conversation) 

390 score += self._score_project_keywords(conv_content, context) 

391 

392 return min(score, 1.0) # Cap at 1.0 

393 

394 

395class AutoContextLoader: 

396 """Main class for automatic context loading.""" 

397 

398 def __init__(self, reflection_db: ReflectionDatabase) -> None: 

399 self.reflection_db = reflection_db 

400 self.context_detector = ContextDetector() 

401 self.relevance_scorer = RelevanceScorer() 

402 self.cache = {} 

403 self.cache_timeout = 300 # 5 minutes 

404 

405 async def load_relevant_context( 

406 self, 

407 working_dir: str | None = None, 

408 max_conversations: int = 10, 

409 min_relevance: float = 0.3, 

410 ) -> dict[str, Any]: 

411 """Load relevant conversations based on current context.""" 

412 # Detect current context 

413 current_context = self.context_detector.detect_current_context(working_dir) 

414 

415 # Generate cache key based on context 

416 context_hash = self._generate_context_hash(current_context) 

417 

418 # Check cache 

419 if context_hash in self.cache: 

420 cached_time, cached_result = self.cache[context_hash] 

421 if datetime.now() - cached_time < timedelta(seconds=self.cache_timeout): 

422 return cached_result 

423 

424 # Get all conversations from database 

425 relevant_conversations = [] 

426 

427 if hasattr(self.reflection_db, "conn") and self.reflection_db.conn: 

428 cursor = self.reflection_db.conn.execute( 

429 "SELECT id, content, project, timestamp, metadata FROM conversations", 

430 ) 

431 conversations = cursor.fetchall() 

432 

433 for conv in conversations: 

434 conv_id, content, project, timestamp, metadata = conv 

435 

436 conversation_data = { 

437 "id": conv_id, 

438 "content": content, 

439 "project": project, 

440 "timestamp": timestamp, 

441 "metadata": json.loads(metadata) if metadata else {}, 

442 } 

443 

444 # Score relevance 

445 relevance = self.relevance_scorer.score_conversation_relevance( 

446 conversation_data, 

447 current_context, 

448 ) 

449 

450 if relevance >= min_relevance: 

451 conversation_data["relevance_score"] = relevance 

452 relevant_conversations.append(conversation_data) 

453 

454 # Sort by relevance and limit results 

455 relevant_conversations.sort(key=lambda x: x["relevance_score"], reverse=True) 

456 top_conversations = relevant_conversations[:max_conversations] 

457 

458 result = { 

459 "context": current_context, 

460 "relevant_conversations": top_conversations, 

461 "total_found": len(relevant_conversations), 

462 "loaded_count": len(top_conversations), 

463 "min_relevance_threshold": min_relevance, 

464 } 

465 

466 # Cache result 

467 self.cache[context_hash] = (datetime.now(), result) 

468 

469 return result 

470 

471 def _generate_context_hash(self, context: dict[str, Any]) -> str: 

472 """Generate hash for context caching.""" 

473 # Use key context elements for hashing 

474 hash_data = { 

475 "project_name": context["project_name"], 

476 "detected_languages": sorted(context["detected_languages"]), 

477 "detected_tools": sorted(context["detected_tools"]), 

478 "project_type": context.get("project_type"), 

479 "working_directory": context["working_directory"], 

480 } 

481 

482 hash_string = json.dumps(hash_data, sort_keys=True) 

483 return hashlib.md5(hash_string.encode()).hexdigest()[:12] 

484 

485 async def get_context_summary(self, working_dir: str | None = None) -> str: 

486 """Get a human-readable summary of current context.""" 

487 context = self.context_detector.detect_current_context(working_dir) 

488 

489 summary_parts = [] 

490 summary_parts.append(f"📁 Project: {context['project_name']}") 

491 summary_parts.append(f"📂 Directory: {context['working_directory']}") 

492 

493 if context["detected_languages"]: 

494 langs = ", ".join(context["detected_languages"]) 

495 summary_parts.append(f"💻 Languages: {langs}") 

496 

497 if context["detected_tools"]: 

498 tools = ", ".join(context["detected_tools"]) 

499 summary_parts.append(f"🔧 Tools: {tools}") 

500 

501 if context["project_type"]: 

502 summary_parts.append( 

503 f"📋 Type: {context['project_type'].replace('_', ' ').title()}", 

504 ) 

505 

506 if context["git_info"].get("is_git_repo"): 

507 git_info = context["git_info"] 

508 branch = git_info.get("current_branch", "unknown") 

509 platform = git_info.get("platform", "git") 

510 summary_parts.append(f"🌿 Git: {branch} branch on {platform}") 

511 

512 if context["recent_files"]: 

513 count = len(context["recent_files"]) 

514 summary_parts.append(f"📄 Recent files: {count} modified in last 2 hours") 

515 

516 confidence = context["confidence_score"] * 100 

517 summary_parts.append(f"🎯 Detection confidence: {confidence:.0f}%") 

518 

519 return "\n".join(summary_parts)