Coverage for session_buddy / server_core.py: 35.48%

230 statements  

« prev     ^ index     » next       coverage.py v7.13.1, created at 2026-01-04 00:43 -0800

1"""MCP Server Core Infrastructure. 

2 

3This module handles FastMCP initialization, server lifecycle management, 

4tool registration, and core infrastructure components. 

5 

6Extracted Components: 

7- SessionPermissionsManager class (singleton permissions management) 

8- Configuration functions (_load_mcp_config, _detect_other_mcp_servers, etc.) 

9- Session lifecycle handler (session_lifecycle) 

10- Initialization functions (initialize_new_features, analyze_project_context) 

11- Health and status functions (health_check, _add_basic_status_info, etc.) 

12- Quality formatting functions (_format_quality_results, _perform_git_checkpoint) 

13""" 

14 

15from __future__ import annotations 

16 

17import os 

18import shutil 

19import subprocess # nosec B404 

20import warnings 

21from contextlib import asynccontextmanager, suppress 

22from pathlib import Path 

23from typing import TYPE_CHECKING, Any 

24 

25if TYPE_CHECKING: 

26 from collections.abc import AsyncGenerator 

27 

28 from mcp_common.ui import ServerPanels 

29 from session_buddy.core.permissions import SessionPermissionsManager 

30 from session_buddy.utils.logging import SessionLogger 

31 

32# Re-export for backward compatibility in tests and integrations. 

33 

34# Suppress transformers warnings 

35os.environ["TRANSFORMERS_VERBOSITY"] = "error" 

36warnings.filterwarnings("ignore", message=".*PyTorch.*TensorFlow.*Flax.*") 

37 

38# Import mcp-common ServerPanels for beautiful terminal UI 

39try: 

40 from mcp_common.ui import ServerPanels 

41 

42 SERVERPANELS_AVAILABLE = True 

43except ImportError: 

44 SERVERPANELS_AVAILABLE = False 

45 

46try: 

47 import tomli 

48except ImportError: 

49 tomli = None # type: ignore[assignment] 

50 

51# Import extracted modules 

52 

53 

54# ===================================== 

55# Configuration and Detection Functions 

56# ===================================== 

57 

58 

59def _detect_other_mcp_servers() -> dict[str, bool]: 

60 """Detect availability of other MCP servers by checking common paths and processes.""" 

61 detected = {} 

62 

63 # Check for crackerjack MCP server 

64 try: 

65 # Try to import crackerjack to see if it's available 

66 result = subprocess.run( 

67 ["crackerjack", "--version"], 

68 check=False, 

69 capture_output=True, 

70 text=True, 

71 timeout=5, 

72 ) 

73 detected["crackerjack"] = result.returncode == 0 

74 except (subprocess.SubprocessError, FileNotFoundError, subprocess.TimeoutExpired): 

75 detected["crackerjack"] = False 

76 

77 return detected 

78 

79 

80def _generate_server_guidance(detected_servers: dict[str, bool]) -> list[str]: 

81 """Generate guidance messages based on detected servers.""" 

82 guidance = [] 

83 

84 if detected_servers.get("crackerjack"): 

85 guidance.extend( 

86 [ 

87 "💡 CRACKERJACK INTEGRATION DETECTED:", 

88 " Enhanced commands available for better development experience:", 

89 " • Use /session-buddy:crackerjack-run instead of /crackerjack:run", 

90 " • Gets memory, analytics, and intelligent insights automatically", 

91 " • View trends with /session-buddy:crackerjack-history", 

92 " • Analyze patterns with /session-buddy:crackerjack-patterns", 

93 ], 

94 ) 

95 

96 return guidance 

97 

98 

99def _load_mcp_config() -> dict[str, Any]: 

100 """Load MCP server configuration from pyproject.toml.""" 

101 # Look for pyproject.toml in the current project directory 

102 pyproject_path = Path.cwd() / "pyproject.toml" 

103 

104 # If not found in cwd, look in parent directories (up to 3 levels) 

105 if not pyproject_path.exists(): 105 ↛ 106line 105 didn't jump to line 106 because the condition on line 105 was never true

106 for parent in Path.cwd().parents[:3]: 

107 potential_path = parent / "pyproject.toml" 

108 if potential_path.exists(): 

109 pyproject_path = potential_path 

110 break 

111 

112 if not pyproject_path.exists() or not tomli: 112 ↛ 113line 112 didn't jump to line 113 because the condition on line 112 was never true

113 return { 

114 "http_port": 8678, 

115 "http_host": "127.0.0.1", 

116 "websocket_monitor_port": 8677, 

117 "http_enabled": False, 

118 } 

119 

120 try: 

121 with pyproject_path.open("rb") as f: 

122 pyproject_data = tomli.load(f) 

123 

124 session_config = pyproject_data.get("tool", {}).get("session-mgmt-mcp", {}) 

125 

126 return { 

127 "http_port": session_config.get("mcp_http_port", 8678), 

128 "http_host": session_config.get("mcp_http_host", "127.0.0.1"), 

129 "websocket_monitor_port": session_config.get( 

130 "websocket_monitor_port", 

131 8677, 

132 ), 

133 "http_enabled": session_config.get("http_enabled", False), 

134 } 

135 except Exception as e: 

136 if SERVERPANELS_AVAILABLE: 

137 ServerPanels.warning( 

138 title="Configuration Warning", 

139 message="Failed to load MCP config from pyproject.toml", 

140 details=[str(e), "Using default configuration values"], 

141 ) 

142 return { 

143 "http_port": 8678, 

144 "http_host": "127.0.0.1", 

145 "websocket_monitor_port": 8677, 

146 "http_enabled": False, 

147 } 

148 

149 

150# ===================================== 

151# Session Lifecycle Handler 

152# ===================================== 

153 

154 

155@asynccontextmanager 

156async def session_lifecycle( 

157 app: Any, 

158 lifecycle_manager: Any, 

159 session_logger: SessionLogger, 

160) -> AsyncGenerator[None]: 

161 """Automatic session lifecycle for git repositories only. 

162 

163 Args: 

164 app: FastMCP application instance 

165 lifecycle_manager: SessionLifecycleManager instance 

166 session_logger: SessionLogger instance 

167 

168 Yields: 

169 None during server lifetime 

170 

171 """ 

172 # Import here to avoid circular dependencies 

173 from session_buddy.utils.git_operations import get_git_root, is_git_repository 

174 

175 current_dir = Path.cwd() 

176 

177 # Only auto-initialize for git repositories 

178 if is_git_repository(current_dir): 

179 try: 

180 git_root = get_git_root(current_dir) 

181 session_logger.info(f"Git repository detected at {git_root}") 

182 

183 # Run the same logic as the start tool but with connection notification 

184 result = await lifecycle_manager.initialize_session(str(current_dir)) 

185 if result["success"]: 

186 session_logger.info("✅ Auto-initialized session for git repository") 

187 

188 # Import set_connection_info here to avoid circular dependency 

189 from session_buddy.advanced_features import set_connection_info 

190 

191 # Store connection info for display via tools 

192 connection_info = { 

193 "connected_at": "just connected", 

194 "project": result["project"], 

195 "quality_score": result["quality_score"], 

196 "previous_session": result.get("previous_session"), 

197 "recommendations": result["quality_data"].get( 

198 "recommendations", 

199 [], 

200 ), 

201 } 

202 set_connection_info(connection_info) 

203 else: 

204 session_logger.warning(f"Auto-init failed: {result['error']}") 

205 except Exception as e: 

206 session_logger.warning(f"Auto-init failed (non-critical): {e}") 

207 else: 

208 # Not a git repository - no auto-initialization 

209 session_logger.debug("Non-git directory - skipping auto-initialization") 

210 

211 yield # Server runs normally 

212 

213 # On disconnect - cleanup for git repos only 

214 if is_git_repository(current_dir): 

215 try: 

216 result = await lifecycle_manager.end_session() 

217 if result["success"]: 

218 session_logger.info("✅ Auto-ended session for git repository") 

219 else: 

220 session_logger.warning(f"Auto-cleanup failed: {result['error']}") 

221 except Exception as e: 

222 session_logger.warning(f"Auto-cleanup failed (non-critical): {e}") 

223 

224 

225# ===================================== 

226# Initialization Functions 

227# ===================================== 

228 

229 

230async def auto_setup_git_working_directory(session_logger: SessionLogger) -> None: 

231 """Auto-detect and setup git working directory for enhanced DX.""" 

232 try: 

233 # Get current working directory 

234 current_dir = Path.cwd() 

235 

236 # Import git utilities 

237 from session_buddy.utils.git_operations import ( 

238 get_git_root, 

239 is_git_repository, 

240 ) 

241 

242 # Try to find git root from current directory 

243 git_root = None 

244 if is_git_repository(current_dir): 

245 git_root = get_git_root(current_dir) 

246 

247 if git_root and git_root.exists(): 247 ↛ 249line 247 didn't jump to line 249 because the condition on line 247 was never true

248 # Log the auto-setup action for Claude to see 

249 session_logger.info(f"🔧 Auto-detected git repository: {git_root}") 

250 session_logger.info( 

251 f"💡 Recommend: Use `mcp__git__git_set_working_dir` with path='{git_root}'", 

252 ) 

253 

254 # Also log to stderr for immediate visibility 

255 if SERVERPANELS_AVAILABLE: 

256 ServerPanels.info( 

257 title="Git Repository Detected", 

258 message=f"Repository root: {git_root}", 

259 items={ 

260 "Auto-setup command": f"git_set_working_dir('{git_root}')", 

261 "Auto-lifecycle": "Enabled (init, checkpoint, cleanup)", 

262 }, 

263 ) 

264 else: 

265 session_logger.debug( 

266 "No git repository detected in current directory - skipping auto-setup", 

267 ) 

268 

269 except Exception as e: 

270 # Graceful fallback - don't break server startup 

271 session_logger.debug(f"Git auto-setup failed (non-critical): {e}") 

272 

273 

274async def initialize_new_features( 

275 session_logger: SessionLogger, 

276 multi_project_coordinator_ref: Any, 

277 advanced_search_engine_ref: Any, 

278 app_config_ref: Any, 

279) -> tuple[Any, Any, Any]: 

280 """Initialize multi-project coordination and advanced search features. 

281 

282 Args: 

283 session_logger: Logger instance for diagnostics 

284 multi_project_coordinator_ref: Reference to store coordinator instance 

285 advanced_search_engine_ref: Reference to store search engine instance 

286 app_config_ref: Reference to store configuration 

287 

288 Returns: 

289 Tuple of (multi_project_coordinator, advanced_search_engine, app_config) 

290 

291 """ 

292 # Import feature detection 

293 from session_buddy.core.features import get_feature_flags 

294 

295 _features = get_feature_flags() 

296 advanced_search_available = _features["ADVANCED_SEARCH_AVAILABLE"] 

297 config_available = _features["CONFIG_AVAILABLE"] 

298 multi_project_available = _features["MULTI_PROJECT_AVAILABLE"] 

299 reflection_tools_available = _features["REFLECTION_TOOLS_AVAILABLE"] 

300 

301 # Auto-setup git working directory for enhanced DX 

302 await auto_setup_git_working_directory(session_logger) 

303 

304 # Initialize default return values 

305 multi_project_coordinator = multi_project_coordinator_ref 

306 advanced_search_engine = advanced_search_engine_ref 

307 app_config = app_config_ref 

308 

309 # Load configuration 

310 if config_available: 

311 from session_buddy.settings import get_settings 

312 

313 app_config = get_settings() 

314 

315 # Initialize reflection database for new features 

316 if reflection_tools_available: 

317 with suppress( 

318 ImportError, 

319 ModuleNotFoundError, 

320 RuntimeError, 

321 AttributeError, 

322 OSError, 

323 ValueError, 

324 ): 

325 from session_buddy.reflection_tools import get_reflection_database 

326 

327 db = await get_reflection_database() 

328 

329 # Initialize multi-project coordinator 

330 if multi_project_available: 

331 from session_buddy.multi_project_coordinator import ( 

332 MultiProjectCoordinator, 

333 ) 

334 

335 multi_project_coordinator = MultiProjectCoordinator(db) 

336 

337 # Initialize advanced search engine 

338 if advanced_search_available: 

339 from session_buddy.advanced_search import AdvancedSearchEngine 

340 

341 # Type ignore: db is ReflectionDatabaseAdapterOneiric which is compatible 

342 advanced_search_engine = AdvancedSearchEngine(db) # type: ignore[arg-type] 

343 

344 return multi_project_coordinator, advanced_search_engine, app_config 

345 

346 

347# Re-export for backward compatibility 

348from session_buddy.utils.project_analysis import ( 

349 analyze_project_context as _analyze_project_context, 

350) 

351 

352 

353async def analyze_project_context(project_dir: Path) -> dict[str, bool]: 

354 """Analyze project structure and context with enhanced error handling. 

355 

356 This is a backward-compatibility wrapper that delegates to the utility module. 

357 Direct imports from session_buddy.utils.project_analysis are preferred. 

358 """ 

359 return await _analyze_project_context(project_dir) 

360 

361 

362# ===================================== 

363# Health & Status Functions 

364# ===================================== 

365 

366 

367async def health_check( 

368 session_logger: SessionLogger, 

369 permissions_manager: SessionPermissionsManager, 

370 validate_claude_directory: Any, 

371) -> dict[str, Any]: 

372 """Comprehensive health check for MCP server and toolkit availability.""" 

373 # Import feature detection 

374 from session_buddy.core.features import get_feature_flags 

375 

376 _features = get_feature_flags() 

377 crackerjack_integration_available = _features["CRACKERJACK_INTEGRATION_AVAILABLE"] 

378 session_management_available = _features["SESSION_MANAGEMENT_AVAILABLE"] 

379 

380 health_status: dict[str, Any] = { 

381 "overall_healthy": True, 

382 "checks": {}, 

383 "warnings": [], 

384 "errors": [], 

385 } 

386 

387 # MCP Server health 

388 try: 

389 # Test FastMCP availability 

390 health_status["checks"]["mcp_server"] = "✅ Active" 

391 except Exception as e: 

392 health_status["checks"]["mcp_server"] = "❌ Error" 

393 health_status["errors"].append(f"MCP server issue: {e}") 

394 health_status["overall_healthy"] = False 

395 

396 # Session management toolkit health 

397 health_status["checks"]["session_toolkit"] = ( 

398 "✅ Available" if session_management_available else "⚠️ Limited" 

399 ) 

400 if not session_management_available: 400 ↛ 401line 400 didn't jump to line 401 because the condition on line 400 was never true

401 health_status["warnings"].append( 

402 "Session management toolkit not fully available", 

403 ) 

404 

405 # UV package manager health 

406 uv_available = shutil.which("uv") is not None 

407 health_status["checks"]["uv_manager"] = ( 

408 "✅ Available" if uv_available else "❌ Missing" 

409 ) 

410 if not uv_available: 410 ↛ 411line 410 didn't jump to line 411 because the condition on line 410 was never true

411 health_status["warnings"].append("UV package manager not found") 

412 

413 # Claude directory health 

414 validate_claude_directory() 

415 health_status["checks"]["claude_directory"] = "✅ Valid" 

416 

417 # Permissions system health 

418 try: 

419 permissions_status = permissions_manager.get_permission_status() 

420 health_status["checks"]["permissions_system"] = "✅ Active" 

421 health_status["checks"]["session_id"] = ( 

422 f"Active ({permissions_status['session_id']})" 

423 ) 

424 except Exception as e: 

425 health_status["checks"]["permissions_system"] = "❌ Error" 

426 health_status["errors"].append(f"Permissions system issue: {e}") 

427 health_status["overall_healthy"] = False 

428 

429 # Crackerjack integration health 

430 health_status["checks"]["crackerjack_integration"] = ( 

431 "✅ Available" if crackerjack_integration_available else "⚠️ Not Available" 

432 ) 

433 if not crackerjack_integration_available: 433 ↛ 434line 433 didn't jump to line 434 because the condition on line 433 was never true

434 health_status["warnings"].append( 

435 "Crackerjack integration not available - quality monitoring disabled", 

436 ) 

437 

438 # Log health check results 

439 session_logger.info( 

440 "Health check completed", 

441 overall_healthy=health_status["overall_healthy"], 

442 warnings_count=len(health_status["warnings"]), 

443 errors_count=len(health_status["errors"]), 

444 ) 

445 

446 return health_status 

447 

448 

449async def _add_basic_status_info( 

450 output: list[str], 

451 current_dir: Path, 

452 current_project_ref: Any, 

453) -> None: 

454 """Add basic status information to output.""" 

455 current_project_ref = current_dir.name 

456 

457 output.extend( 

458 ( 

459 f"📁 Current project: {current_project_ref}", 

460 f"🗂️ Working directory: {current_dir}", 

461 "🌐 MCP server: Active (Claude Session Management)", 

462 ) 

463 ) 

464 

465 

466async def _add_health_status_info( 

467 output: list[str], 

468 session_logger: SessionLogger, 

469 permissions_manager: SessionPermissionsManager, 

470 validate_claude_directory: Any, 

471) -> None: 

472 """Add health check information to output.""" 

473 health_status = await health_check( 

474 session_logger, 

475 permissions_manager, 

476 validate_claude_directory, 

477 ) 

478 

479 output.append( 

480 f"\n🏥 System Health: {'✅ HEALTHY' if health_status['overall_healthy'] else '⚠️ ISSUES DETECTED'}", 

481 ) 

482 

483 # Display health check results 

484 for check_name, status in health_status["checks"].items(): 

485 friendly_name = check_name.replace("_", " ").title() 

486 output.append(f"{friendly_name}: {status}") 

487 

488 # Show warnings and errors 

489 if health_status["warnings"]: 

490 output.append("\n⚠️ Health Warnings:") 

491 for warning in health_status["warnings"][:3]: # Limit to 3 warnings 

492 output.append(f"{warning}") 

493 

494 if health_status["errors"]: 

495 output.append("\n❌ Health Errors:") 

496 for error in health_status["errors"][:3]: # Limit to 3 errors 

497 output.append(f"{error}") 

498 

499 

500async def _get_project_context_info( 

501 current_dir: Path, 

502) -> tuple[dict[str, Any], int, int]: 

503 """Get project context information and scores.""" 

504 project_context = await analyze_project_context(current_dir) 

505 context_score = sum(1 for detected in project_context.values() if detected) 

506 max_score = len(project_context) 

507 return project_context, context_score, max_score 

508 

509 

510# ===================================== 

511# Quality & Formatting Functions 

512# ===================================== 

513 

514 

515async def _format_quality_results( 

516 quality_score: int, 

517 quality_data: dict[str, Any], 

518 checkpoint_result: dict[str, Any] | None = None, 

519) -> list[str]: 

520 """Format quality assessment results for display.""" 

521 output = [] 

522 

523 # Quality status with version indicator 

524 version = quality_data.get("version", "1.0") 

525 if quality_score >= 80: 

526 output.append( 

527 f"✅ Session quality: EXCELLENT (Score: {quality_score}/100) [V{version}]", 

528 ) 

529 elif quality_score >= 60: 

530 output.append( 

531 f"✅ Session quality: GOOD (Score: {quality_score}/100) [V{version}]", 

532 ) 

533 else: 

534 output.append( 

535 f"⚠️ Session quality: NEEDS ATTENTION (Score: {quality_score}/100) [V{version}]", 

536 ) 

537 

538 # Quality breakdown - V2 format (actual code quality metrics) 

539 output.append("\n📈 Quality breakdown (code health metrics):") 

540 breakdown = quality_data["breakdown"] 

541 output.extend( 

542 ( 

543 f" • Code quality: {breakdown['code_quality']:.1f}/40", 

544 f" • Project health: {breakdown['project_health']:.1f}/30", 

545 f" • Dev velocity: {breakdown['dev_velocity']:.1f}/20", 

546 f" • Security: {breakdown['security']:.1f}/10", 

547 ) 

548 ) 

549 

550 # Trust score (separate from quality) 

551 if "trust_score" in quality_data: 

552 trust = quality_data["trust_score"] 

553 output.extend( 

554 ( 

555 f"\n🔐 Trust score: {trust['total']:.0f}/100 (separate metric)", 

556 f" • Trusted operations: {trust['breakdown']['trusted_operations']:.0f}/40", 

557 f" • Session features: {trust['breakdown']['session_availability']:.0f}/30", 

558 f" • Tool ecosystem: {trust['breakdown']['tool_ecosystem']:.0f}/30", 

559 ) 

560 ) 

561 

562 # Recommendations 

563 recommendations = quality_data["recommendations"] 

564 if recommendations: 

565 output.append("\n💡 Recommendations:") 

566 for rec in recommendations[:3]: 

567 output.append(f"{rec}") 

568 

569 # Session management specific results 

570 if checkpoint_result: 

571 strengths = checkpoint_result.get("strengths", []) 

572 if strengths: 

573 output.append("\n🌟 Session strengths:") 

574 for strength in strengths[:3]: 

575 output.append(f"{strength}") 

576 

577 session_stats = checkpoint_result.get("session_stats", {}) 

578 if session_stats: 

579 output.extend( 

580 ( 

581 "\n⏱️ Session progress:", 

582 f" • Duration: {session_stats.get('duration_minutes', 0)} minutes", 

583 f" • Checkpoints: {session_stats.get('total_checkpoints', 0)}", 

584 f" • Success rate: {session_stats.get('success_rate', 0):.1f}%", 

585 ) 

586 ) 

587 

588 return output 

589 

590 

591async def _perform_git_checkpoint( 

592 current_dir: Path, 

593 quality_score: int, 

594 project_name: str, 

595) -> list[str]: 

596 """Handle git operations for checkpoint commit.""" 

597 output = [] 

598 output.extend(("\n" + "=" * 50, "📦 Git Checkpoint Commit", "=" * 50)) 

599 

600 # Use the proper checkpoint commit function from git_operations 

601 from session_buddy.utils.git_operations import create_checkpoint_commit 

602 

603 success, result, commit_output = create_checkpoint_commit( 

604 current_dir, 

605 project_name, 

606 quality_score, 

607 ) 

608 

609 # Add the commit output to our output 

610 output.extend(commit_output) 

611 

612 if success and result != "clean": 

613 output.append(f"✅ Checkpoint commit created: {result}") 

614 elif not success: 

615 output.append(f"⚠️ Failed to stage files: {result}") 

616 

617 return output 

618 

619 

620async def _format_conversation_summary() -> list[str]: 

621 """Format the conversation summary section.""" 

622 output = [] 

623 with suppress( 

624 ImportError, 

625 ModuleNotFoundError, 

626 RuntimeError, 

627 AttributeError, 

628 ValueError, 

629 ): 

630 from session_buddy.quality_engine import summarize_current_conversation 

631 

632 conversation_summary = await summarize_current_conversation() 

633 if conversation_summary["key_topics"]: 633 ↛ 638line 633 didn't jump to line 638 because the condition on line 633 was always true

634 output.append("\n💬 Current Session Focus:") 

635 for topic in conversation_summary["key_topics"][:3]: 

636 output.append(f"{topic}") 

637 

638 if conversation_summary["decisions_made"]: 638 ↛ 642line 638 didn't jump to line 642

639 output.append("\n✅ Key Decisions:") 

640 for decision in conversation_summary["decisions_made"][:2]: 

641 output.append(f"{decision}") 

642 return output 

643 

644 

645# ===================================== 

646# Utility Functions 

647# ===================================== 

648 

649 

650def _should_retry_search(error: Exception) -> bool: 

651 """Determine if a search error warrants a retry with cleanup.""" 

652 # Retry for database connection issues or temporary errors 

653 error_msg = str(error).lower() 

654 retry_conditions = [ 

655 "database is locked", 

656 "connection failed", 

657 "temporary failure", 

658 "timeout", 

659 "index not found", 

660 ] 

661 return any(condition in error_msg for condition in retry_conditions) 

662 

663 

664# ===================================== 

665# Feature Detection (Phase 2.6) 

666# =====================================