Coverage for session_buddy / context_manager.py: 94.64%

265 statements  

« prev     ^ index     » next       coverage.py v7.13.1, created at 2026-01-04 00:43 -0800

1#!/usr/bin/env python3 

2"""Auto-Context Loading for Session Management MCP Server. 

3 

4Automatically detects current development context and loads relevant conversations. 

5""" 

6 

7import hashlib 

8import json 

9import operator 

10import os 

11from datetime import datetime, timedelta 

12from pathlib import Path 

13from typing import Any 

14 

15from .reflection_tools import ReflectionDatabase 

16from .utils.git_operations import get_worktree_info, list_worktrees 

17 

18 

19class ContextDetector: 

20 """Detects current development context from environment and files.""" 

21 

22 def __init__(self) -> None: 

23 self.context_indicators = { 

24 "git": [".git", ".gitignore", ".github"], 

25 "python": ["pyproject.toml", "setup.py", "requirements.txt", "*.py"], 

26 "javascript": ["package.json", "node_modules", "*.js", "*.ts"], 

27 "rust": ["Cargo.toml", "Cargo.lock", "*.rs"], 

28 "go": ["go.mod", "go.sum", "*.go"], 

29 "java": ["pom.xml", "build.gradle", "*.java"], 

30 "docker": ["Dockerfile", "docker-compose.yml", ".dockerignore"], 

31 "web": ["index.html", "*.css", "*.scss"], 

32 "testing": ["tests/", "test/", "*test*", "pytest.ini"], 

33 "documentation": ["README.md", "docs/", "*.md"], 

34 "config": [".env", ".envrc", "config/", "*.ini", "*.yaml", "*.yml"], 

35 } 

36 

37 self.project_types = { 

38 "mcp_server": ["mcp.json", ".mcp.json", "fastmcp"], 

39 "api": ["api/", "routes/", "endpoints/"], 

40 "web_app": ["templates/", "static/", "public/"], 

41 "cli_tool": ["cli/", "commands/", "__main__.py"], 

42 "library": ["src/", "lib/", "__init__.py"], 

43 "data_science": ["*.ipynb", "data/", "notebooks/"], 

44 "ml_project": ["model/", "models/", "training/", "*.pkl"], 

45 "devops": ["terraform/", "ansible/", "k8s/", "kubernetes/"], 

46 } 

47 

48 def _initialize_context(self, working_path: Path) -> dict[str, Any]: 

49 """Initialize basic context structure.""" 

50 return { 

51 "working_directory": str(working_path), 

52 "project_name": working_path.name, 

53 "detected_languages": [], 

54 "detected_tools": [], 

55 "project_type": None, 

56 "current_files": [], 

57 "recent_files": [], 

58 "git_info": {}, 

59 "worktree_info": None, 

60 "confidence_score": 0.0, 

61 } 

62 

63 def _find_indicators(self, working_path: Path, indicators: list[str]) -> list[str]: 

64 """Find matching indicators in the working directory.""" 

65 found_indicators = [] 

66 

67 for indicator in indicators: 

68 if indicator.startswith("*"): 

69 # Glob pattern 

70 matches = list(working_path.glob(indicator)) 

71 if matches: 

72 found_indicators.extend([m.name for m in matches[:3]]) # Limit to 3 

73 elif indicator.endswith("/"): 

74 # Directory 

75 if (working_path / indicator.rstrip("/")).exists(): 

76 found_indicators.append(indicator) 

77 # File 

78 elif (working_path / indicator).exists(): 

79 found_indicators.append(indicator) 

80 

81 return found_indicators 

82 

83 def _detect_languages_and_tools( 

84 self, 

85 working_path: Path, 

86 context: dict[str, Any], 

87 ) -> None: 

88 """Detect programming languages and development tools.""" 

89 for category, indicators in self.context_indicators.items(): 

90 found_indicators = self._find_indicators(working_path, indicators) 

91 

92 if found_indicators: 

93 if category in {"python", "javascript", "rust", "go", "java"}: 

94 context["detected_languages"].append(category) 

95 else: 

96 context["detected_tools"].append(category) 

97 context["confidence_score"] += 0.1 

98 

99 def _calculate_project_type_score( 

100 self, 

101 working_path: Path, 

102 indicators: list[str], 

103 ) -> float: 

104 """Calculate score for a specific project type.""" 

105 type_score = 0.0 

106 

107 for indicator in indicators: 

108 if indicator.startswith("*"): 

109 if list(working_path.glob(indicator)): 109 ↛ 110line 109 didn't jump to line 110 because the condition on line 109 was never true

110 type_score += 1 

111 elif indicator.endswith("/"): 

112 if (working_path / indicator.rstrip("/")).exists(): 

113 type_score += 1 

114 elif (working_path / indicator).exists(): 

115 type_score += 1 

116 elif indicator in str(working_path): # Check if it's in path name 116 ↛ 117line 116 didn't jump to line 117 because the condition on line 116 was never true

117 type_score += 0.5 

118 

119 return type_score 

120 

121 def _detect_project_type(self, working_path: Path, context: dict[str, Any]) -> None: 

122 """Detect the type of project.""" 

123 best_score = 0.0 

124 

125 for proj_type, indicators in self.project_types.items(): 

126 type_score = self._calculate_project_type_score(working_path, indicators) 

127 

128 if type_score > best_score: 

129 context["project_type"] = proj_type 

130 best_score = type_score 

131 

132 def _get_recent_files(self, working_path: Path) -> list[dict[str, Any]]: 

133 """Get recently modified files.""" 

134 recent_files = [] 

135 

136 try: 

137 recent_threshold = datetime.now() - timedelta(hours=2) 

138 

139 for file_path in working_path.rglob("*"): 

140 if file_path.is_file() and not self._should_ignore_file(file_path): 

141 mod_time = datetime.fromtimestamp(file_path.stat().st_mtime) 

142 

143 if mod_time > recent_threshold: 143 ↛ 139line 143 didn't jump to line 139 because the condition on line 143 was always true

144 recent_files.append( 

145 { 

146 "path": str(file_path.relative_to(working_path)), 

147 "modified": mod_time.isoformat(), 

148 "size": file_path.stat().st_size, 

149 }, 

150 ) 

151 

152 # Sort by modification time and return top 10 

153 recent_files.sort(key=lambda x: str(x["modified"]), reverse=True) 

154 return recent_files[:10] 

155 

156 except (OSError, PermissionError): 

157 return [] 

158 

159 def detect_current_context(self, working_dir: str | None = None) -> dict[str, Any]: 

160 """Detect current development context.""" 

161 working_path = self._resolve_working_path(working_dir) 

162 context = self._initialize_context(working_path) 

163 

164 self._gather_project_context(working_path, context) 

165 self._gather_git_context(working_path, context) 

166 

167 return context 

168 

169 def _resolve_working_path(self, working_dir: str | None) -> Path: 

170 """Resolve the working directory path.""" 

171 if not working_dir: 

172 try: 

173 cwd = Path.cwd() 

174 except FileNotFoundError: 

175 cwd = Path.home() 

176 working_dir = os.environ.get("PWD", str(cwd)) 

177 return Path(working_dir) if working_dir else Path.home() 

178 

179 def _gather_project_context( 

180 self, 

181 working_path: Path, 

182 context: dict[str, Any], 

183 ) -> None: 

184 """Gather project-specific context information.""" 

185 self._detect_languages_and_tools(working_path, context) 

186 self._detect_project_type(working_path, context) 

187 context["recent_files"] = self._get_recent_files(working_path) 

188 

189 def _gather_git_context(self, working_path: Path, context: dict[str, Any]) -> None: 

190 """Gather Git and worktree context information.""" 

191 context["git_info"] = self._get_git_info(working_path) 

192 self._add_worktree_context(working_path, context) 

193 

194 def _add_worktree_context( 

195 self, 

196 working_path: Path, 

197 context: dict[str, Any], 

198 ) -> None: 

199 """Add worktree information to context.""" 

200 worktree_info = get_worktree_info(working_path) 

201 if worktree_info: 201 ↛ 202line 201 didn't jump to line 202 because the condition on line 201 was never true

202 context["worktree_info"] = self._format_worktree_info(worktree_info) 

203 context["all_worktrees"] = self._get_all_worktrees_info( 

204 working_path, 

205 worktree_info, 

206 ) 

207 

208 def _format_worktree_info(self, worktree_info: Any) -> dict[str, Any]: 

209 """Format worktree information for context.""" 

210 return { 

211 "path": str(worktree_info.path), 

212 "branch": worktree_info.branch, 

213 "is_main_worktree": worktree_info.is_main_worktree, 

214 "is_detached": worktree_info.is_detached, 

215 "is_bare": worktree_info.is_bare, 

216 "locked": worktree_info.locked, 

217 "prunable": worktree_info.prunable, 

218 } 

219 

220 def _get_all_worktrees_info( 

221 self, 

222 working_path: Path, 

223 current_worktree: Any, 

224 ) -> list[dict[str, Any]]: 

225 """Get information about all worktrees.""" 

226 all_worktrees = list_worktrees(working_path) 

227 return [ 

228 { 

229 "path": str(wt.path), 

230 "branch": wt.branch, 

231 "is_main": wt.is_main_worktree, 

232 "is_current": wt.path == current_worktree.path, 

233 } 

234 for wt in all_worktrees 

235 ] 

236 

237 def _should_ignore_file(self, file_path: Path) -> bool: 

238 """Check if file should be ignored.""" 

239 ignore_patterns = { 

240 ".git", 

241 ".venv", 

242 "__pycache__", 

243 "node_modules", 

244 ".pytest_cache", 

245 ".mypy_cache", 

246 ".ruff_cache", 

247 "dist", 

248 "build", 

249 ".DS_Store", 

250 } 

251 

252 # Check if any part of the path matches ignore patterns 

253 for part in file_path.parts: 

254 if part in ignore_patterns or (part.startswith(".") and len(part) > 4): 

255 return True 

256 

257 # Check file extensions to ignore 

258 ignore_extensions = {".pyc", ".pyo", ".log", ".tmp", ".cache"} 

259 return file_path.suffix in ignore_extensions 

260 

261 def _get_git_info(self, working_path: Path) -> dict[str, Any]: 

262 """Get git repository information.""" 

263 git_dir = working_path / ".git" 

264 if not git_dir.exists(): 

265 return {} 

266 

267 from contextlib import suppress 

268 

269 git_info: dict[str, Any] = {} 

270 with suppress(OSError, PermissionError): 

271 self._extract_branch_info(git_dir, git_info, working_path) 

272 self._extract_platform_info(git_dir, git_info) 

273 git_info["is_git_repo"] = "True" 

274 

275 return git_info 

276 

277 def _extract_branch_info( 

278 self, 

279 git_dir: Path, 

280 git_info: dict[str, Any], 

281 working_path: Path, 

282 ) -> None: 

283 """Extract git branch information using worktree-aware detection.""" 

284 worktree_info = get_worktree_info(working_path) 

285 if worktree_info: 285 ↛ 286line 285 didn't jump to line 286 because the condition on line 285 was never true

286 self._populate_worktree_info(git_info, worktree_info) 

287 else: 

288 self._fallback_branch_detection(git_dir, git_info) 

289 

290 def _populate_worktree_info( 

291 self, 

292 git_info: dict[str, Any], 

293 worktree_info: Any, 

294 ) -> None: 

295 """Populate git info from worktree information.""" 

296 git_info["current_branch"] = worktree_info.branch 

297 git_info["is_worktree"] = str(not worktree_info.is_main_worktree) 

298 git_info["is_detached"] = str(worktree_info.is_detached) 

299 git_info["worktree_path"] = str(worktree_info.path) 

300 

301 def _fallback_branch_detection( 

302 self, 

303 git_dir: Path, 

304 git_info: dict[str, Any], 

305 ) -> None: 

306 """Fallback method for branch detection when worktree info unavailable.""" 

307 head_file = git_dir / "HEAD" 

308 if not head_file.exists(): 

309 return 

310 

311 head_content = head_file.read_text().strip() 

312 if head_content.startswith("ref: refs/heads/"): 312 ↛ exitline 312 didn't return from function '_fallback_branch_detection' because the condition on line 312 was always true

313 git_info["current_branch"] = head_content.split("/")[-1] 

314 

315 def _extract_platform_info(self, git_dir: Path, git_info: dict[str, Any]) -> None: 

316 """Extract git platform information from config.""" 

317 config_file = git_dir / "config" 

318 if not config_file.exists(): 318 ↛ 319line 318 didn't jump to line 319 because the condition on line 318 was never true

319 return 

320 

321 config_content = config_file.read_text() 

322 git_info["platform"] = self._determine_git_platform(config_content) 

323 

324 def _determine_git_platform(self, config_content: str) -> str: 

325 """Determine git platform from config content.""" 

326 if "github.com" in config_content: 

327 return "github" 

328 if "gitlab.com" in config_content: 

329 return "gitlab" 

330 return "git" 

331 

332 

333class RelevanceScorer: 

334 """Scores conversation relevance based on context.""" 

335 

336 def __init__(self) -> None: 

337 self.scoring_weights = { 

338 "project_name_match": 0.3, 

339 "language_match": 0.2, 

340 "tool_match": 0.15, 

341 "file_match": 0.15, 

342 "recency": 0.1, 

343 "keyword_match": 0.1, 

344 } 

345 

346 def _score_project_match( 

347 self, 

348 conv_content: str, 

349 conv_project: str, 

350 context: dict[str, Any], 

351 ) -> float: 

352 """Score based on project name matching.""" 

353 current_project = context["project_name"].lower() 

354 if current_project in conv_project or current_project in conv_content: 

355 return self.scoring_weights["project_name_match"] 

356 return 0.0 

357 

358 def _score_language_match( 

359 self, 

360 conv_content: str, 

361 context: dict[str, Any], 

362 ) -> float: 

363 """Score based on programming language matching.""" 

364 score = 0.0 

365 for lang in context["detected_languages"]: 

366 if lang in conv_content: 

367 score += self.scoring_weights["language_match"] / len( 

368 context["detected_languages"], 

369 ) 

370 return score 

371 

372 def _score_tool_match(self, conv_content: str, context: dict[str, Any]) -> float: 

373 """Score based on development tool matching.""" 

374 score = 0.0 

375 for tool in context["detected_tools"]: 

376 if tool in conv_content: 

377 score += self.scoring_weights["tool_match"] / len( 

378 context["detected_tools"], 

379 ) 

380 return score 

381 

382 def _score_file_match(self, conv_content: str, context: dict[str, Any]) -> float: 

383 """Score based on file name matching.""" 

384 score = 0.0 

385 for file_info in context["recent_files"]: 

386 file_name = Path(file_info["path"]).name.lower() 

387 if file_name in conv_content: 

388 score += self.scoring_weights["file_match"] / len( 

389 context["recent_files"], 

390 ) 

391 return score 

392 

393 def _score_recency(self, conversation: dict[str, Any]) -> float: 

394 """Score based on conversation recency.""" 

395 from contextlib import suppress 

396 

397 with suppress(ValueError, TypeError): 

398 conv_time = datetime.fromisoformat(conversation.get("timestamp", "")) 

399 time_diff = datetime.now() - conv_time 

400 if time_diff.days == 0: 

401 return self.scoring_weights["recency"] 

402 if time_diff.days <= 7: 

403 return self.scoring_weights["recency"] * 0.5 

404 return 0.0 

405 

406 def _get_project_keywords(self) -> dict[str, list[str]]: 

407 """Get project type keyword mappings.""" 

408 return { 

409 "mcp_server": ["mcp", "server", "fastmcp", "protocol"], 

410 "api": ["api", "endpoint", "route", "request", "response"], 

411 "web_app": ["web", "app", "frontend", "backend", "html", "css"], 

412 "cli_tool": ["cli", "command", "argument", "terminal"], 

413 "library": ["library", "package", "module", "import"], 

414 "data_science": ["data", "analysis", "pandas", "numpy", "jupyter"], 

415 "ml_project": ["machine learning", "model", "training", "neural"], 

416 "devops": ["deploy", "infrastructure", "docker", "kubernetes"], 

417 } 

418 

419 def _score_project_keywords( 

420 self, 

421 conv_content: str, 

422 context: dict[str, Any], 

423 ) -> float: 

424 """Score based on project type keywords.""" 

425 if not context.get("project_type"): 

426 return 0.0 

427 

428 project_keywords = self._get_project_keywords() 

429 keywords = project_keywords.get(context["project_type"], []) 

430 

431 score = 0.0 

432 for keyword in keywords: 

433 if keyword in conv_content: 

434 score += self.scoring_weights["keyword_match"] / len(keywords) 

435 

436 return score 

437 

438 def score_conversation_relevance( 

439 self, 

440 conversation: dict[str, Any], 

441 context: dict[str, Any], 

442 ) -> float: 

443 """Score how relevant a conversation is to current context.""" 

444 conv_content = conversation.get("content", "").lower() 

445 conv_project = conversation.get("project", "").lower() 

446 

447 score = 0.0 

448 score += self._score_project_match(conv_content, conv_project, context) 

449 score += self._score_language_match(conv_content, context) 

450 score += self._score_tool_match(conv_content, context) 

451 score += self._score_file_match(conv_content, context) 

452 score += self._score_recency(conversation) 

453 score += self._score_project_keywords(conv_content, context) 

454 

455 return min(score, 1.0) # Cap at 1.0 

456 

457 

458class AutoContextLoader: 

459 """Main class for automatic context loading.""" 

460 

461 def __init__(self, reflection_db: ReflectionDatabase) -> None: 

462 self.reflection_db = reflection_db 

463 self.context_detector = ContextDetector() 

464 self.relevance_scorer = RelevanceScorer() 

465 self.cache: dict[str, Any] = {} 

466 self.cache_timeout = 300 # 5 minutes 

467 

468 async def load_relevant_context( 

469 self, 

470 working_dir: str | None = None, 

471 max_conversations: int = 10, 

472 min_relevance: float = 0.3, 

473 ) -> dict[str, Any]: 

474 """Load relevant conversations based on current context.""" 

475 # Detect current context 

476 current_context = self.context_detector.detect_current_context(working_dir) 

477 

478 # Generate cache key based on context 

479 context_hash = self._generate_context_hash(current_context) 

480 

481 # Check cache 

482 if context_hash in self.cache: 

483 cached_time, cached_result = self.cache[context_hash] 

484 if datetime.now() - cached_time < timedelta(seconds=self.cache_timeout): 484 ↛ 488line 484 didn't jump to line 488 because the condition on line 484 was always true

485 return cached_result # type: ignore[no-any-return] 

486 

487 # Get all conversations from database 

488 relevant_conversations = [] 

489 

490 if hasattr(self.reflection_db, "conn") and self.reflection_db.conn: 

491 cursor = self.reflection_db.conn.execute( 

492 "SELECT id, content, project, timestamp, metadata FROM conversations", 

493 ) 

494 conversations = cursor.fetchall() 

495 

496 for conv in conversations: 

497 conv_id, content, project, timestamp, metadata = conv 

498 

499 conversation_data = { 

500 "id": conv_id, 

501 "content": content, 

502 "project": project, 

503 "timestamp": timestamp, 

504 "metadata": json.loads(metadata) if metadata else {}, 

505 } 

506 

507 # Score relevance 

508 relevance = self.relevance_scorer.score_conversation_relevance( 

509 conversation_data, 

510 current_context, 

511 ) 

512 

513 if relevance >= min_relevance: 

514 conversation_data["relevance_score"] = relevance 

515 relevant_conversations.append(conversation_data) 

516 

517 # Sort by relevance and limit results 

518 relevant_conversations.sort( 

519 key=operator.itemgetter("relevance_score"), reverse=True 

520 ) 

521 top_conversations = relevant_conversations[:max_conversations] 

522 

523 result = { 

524 "context": current_context, 

525 "relevant_conversations": top_conversations, 

526 "total_found": len(relevant_conversations), 

527 "loaded_count": len(top_conversations), 

528 "min_relevance_threshold": min_relevance, 

529 } 

530 

531 # Cache result 

532 self.cache[context_hash] = (datetime.now(), result) 

533 

534 return result 

535 

536 def _generate_context_hash(self, context: dict[str, Any]) -> str: 

537 """Generate hash for context caching.""" 

538 # Use key context elements for hashing 

539 hash_data = { 

540 "project_name": context["project_name"], 

541 "detected_languages": sorted(context["detected_languages"]), 

542 "detected_tools": sorted(context["detected_tools"]), 

543 "project_type": context.get("project_type"), 

544 "working_directory": context["working_directory"], 

545 } 

546 

547 hash_string = json.dumps(hash_data, sort_keys=True) 

548 return hashlib.md5(hash_string.encode(), usedforsecurity=False).hexdigest()[:12] 

549 

550 async def get_context_summary(self, working_dir: str | None = None) -> str: 

551 """Get a human-readable summary of current context.""" 

552 context = self.context_detector.detect_current_context(working_dir) 

553 

554 summary_parts = [] 

555 summary_parts.extend( 

556 ( 

557 f"📁 Project: {context['project_name']}", 

558 f"📂 Directory: {context['working_directory']}", 

559 ) 

560 ) 

561 

562 if context["detected_languages"]: 

563 langs = ", ".join(context["detected_languages"]) 

564 summary_parts.append(f"💻 Languages: {langs}") 

565 

566 if context["detected_tools"]: 

567 tools = ", ".join(context["detected_tools"]) 

568 summary_parts.append(f"🔧 Tools: {tools}") 

569 

570 if context["project_type"]: 570 ↛ 571line 570 didn't jump to line 571 because the condition on line 570 was never true

571 summary_parts.append( 

572 f"📋 Type: {context['project_type'].replace('_', ' ').title()}", 

573 ) 

574 

575 if context["git_info"].get("is_git_repo"): 

576 git_info = context["git_info"] 

577 branch = git_info.get("current_branch", "unknown") 

578 platform = git_info.get("platform", "git") 

579 summary_parts.append(f"🌿 Git: {branch} branch on {platform}") 

580 

581 if context["recent_files"]: 

582 count = len(context["recent_files"]) 

583 summary_parts.append(f"📄 Recent files: {count} modified in last 2 hours") 

584 

585 confidence = context["confidence_score"] * 100 

586 summary_parts.append(f"🎯 Detection confidence: {confidence:.0f}%") 

587 

588 return "\n".join(summary_parts)