Coverage for session_buddy / server.py: 51.94%

387 statements  

« prev     ^ index     » next       coverage.py v7.13.1, created at 2026-01-04 00:43 -0800

1#!/usr/bin/env python3 

2"""Claude Session Management MCP Server - FastMCP Version. 

3 

4A dedicated MCP server that provides session management functionality 

5including initialization, checkpoints, and cleanup across all projects. 

6 

7This server can be included in any project's .mcp.json file to provide 

8automatic access to /session-init, /session-checkpoint, 

9and /session-end slash commands. 

10""" 

11 

12from __future__ import annotations 

13 

14import asyncio 

15import importlib.util 

16import inspect 

17import os 

18import sys 

19import warnings 

20from contextlib import asynccontextmanager, suppress 

21from pathlib import Path 

22from typing import TYPE_CHECKING, Any 

23 

24if TYPE_CHECKING: 

25 import logging 

26 from collections.abc import AsyncGenerator 

27 

28 from mcp_common.exceptions import DependencyMissingError 

29 

30# Suppress transformers warnings about PyTorch/TensorFlow for cleaner CLI output 

31os.environ["TRANSFORMERS_VERBOSITY"] = "error" 

32warnings.filterwarnings("ignore", message=".*PyTorch.*TensorFlow.*Flax.*") 

33 

34# DEBUG: Patch CallToolRequestParams to be hashable and log when hash is called 

35# This helps us identify where the unhashable type error is coming from 

36from pathlib import Path as _PatchPath 

37 

38_patch_file = _PatchPath(__file__).parent.parent / "patch_hashable.py" 

39if _patch_file.exists(): 39 ↛ 40line 39 didn't jump to line 40 because the condition on line 39 was never true

40 import importlib.util as _util 

41 

42 spec = _util.spec_from_file_location("patch_hashable", _patch_file) 

43 if spec and spec.loader: 

44 patch_module = _util.module_from_spec(spec) 

45 sys.modules["patch_hashable"] = patch_module 

46 spec.loader.exec_module(patch_module) 

47 

48# Phase 2.5: Import core infrastructure from server_core 

49from session_buddy.core.features import get_feature_flags 

50from session_buddy.core.lifecycle.service_registry import get_service_registry 

51from session_buddy.core.permissions import SessionPermissionsManager 

52from session_buddy.di import get_sync_typed 

53from session_buddy.di.container import depends 

54from session_buddy.server_core import _load_mcp_config 

55from session_buddy.server_core import ( 

56 # Health & status functions 

57 health_check as _health_check_impl, 

58) 

59from session_buddy.server_core import ( 

60 initialize_new_features as _initialize_new_features_impl, 

61) 

62from session_buddy.server_core import ( 

63 # Session lifecycle handler 

64 session_lifecycle as _session_lifecycle_impl, 

65) 

66from session_buddy.utils.runtime_snapshots import ( 

67 RuntimeSnapshotManager, 

68 run_snapshot_loop, 

69) 

70 

71 

72# Get logger using standard logging (avoid DI type resolution conflicts) 

73def _get_session_logger() -> logging.Logger: 

74 """Get logger using standard logging module.""" 

75 import logging 

76 

77 return logging.getLogger(__name__) 

78 

79 

80# Initialize global session_logger as None to prevent undefined variable 

81session_logger: logging.Logger | None = None 

82 

83 

84def _get_logger() -> logging.Logger: 

85 """Get logger instance with lazy initialization.""" 

86 global session_logger 

87 if session_logger is None: 

88 session_logger = _get_session_logger() 

89 # session_logger is guaranteed to be Logger here 

90 assert session_logger is not None 

91 return session_logger 

92 

93 

94# Check mcp-common exceptions availability (must be defined early for FastMCP import) 

95EXCEPTIONS_AVAILABLE = importlib.util.find_spec("mcp_common.exceptions") is not None 

96 

97if EXCEPTIONS_AVAILABLE: 97 ↛ 101line 97 didn't jump to line 101 because the condition on line 97 was always true

98 from mcp_common.exceptions import DependencyMissingError 

99 

100# Check token optimizer availability (Phase 3.3 M2: improved pattern) 

101TOKEN_OPTIMIZER_AVAILABLE = ( 

102 importlib.util.find_spec("session_buddy.token_optimizer") is not None 

103) 

104 

105if TOKEN_OPTIMIZER_AVAILABLE: 105 ↛ 114line 105 didn't jump to line 114 because the condition on line 105 was always true

106 from session_buddy.token_optimizer import ( 

107 get_cached_chunk, 

108 get_token_usage_stats, 

109 optimize_search_response, 

110 track_token_usage, 

111 ) 

112else: 

113 # Fallback implementations when token optimizer unavailable 

114 TOKEN_OPTIMIZER_AVAILABLE = False 

115 

116 async def optimize_search_response( 

117 results: list[dict[str, Any]], 

118 strategy: str = "prioritize_recent", 

119 max_tokens: int = 4000, 

120 ) -> tuple[list[dict[str, Any]], dict[str, Any]]: 

121 return results, {} 

122 

123 async def track_token_usage( 

124 operation: str, 

125 request_tokens: int, 

126 response_tokens: int, 

127 optimization_applied: str | None = None, 

128 ) -> None: 

129 return None 

130 

131 async def get_cached_chunk( 

132 cache_key: str, 

133 chunk_index: int, 

134 ) -> dict[str, Any] | None: 

135 return None 

136 

137 async def get_token_usage_stats(hours: int = 24) -> dict[str, Any]: 

138 return {"status": "token optimizer unavailable"} 

139 

140 async def optimize_memory_usage( 

141 strategy: str = "auto", 

142 max_age_days: int = 30, 

143 dry_run: bool = True, 

144 ) -> str: 

145 return "❌ Token optimizer not available" 

146 

147 

148# Import FastMCP with test environment fallback 

149try: 

150 from fastmcp import FastMCP 

151 

152 MCP_AVAILABLE = True 

153except ImportError: 

154 if "pytest" in sys.modules or "test" in sys.argv[0].lower(): 

155 from tests.conftest import MockFastMCP 

156 

157 FastMCP = MockFastMCP # type: ignore[no-redef,misc] 

158 MCP_AVAILABLE = False 

159 elif EXCEPTIONS_AVAILABLE: 

160 raise DependencyMissingError( 

161 message="FastMCP is required but not installed", 

162 dependency="fastmcp", 

163 install_command="uv add fastmcp", 

164 ) 

165 else: 

166 # Fallback to sys.exit if exceptions unavailable 

167 sys.exit(1) 

168 

169# Phase 2.6: Get all feature flags from centralized detector 

170_features = get_feature_flags() 

171SESSION_MANAGEMENT_AVAILABLE = _features["SESSION_MANAGEMENT_AVAILABLE"] 

172REFLECTION_TOOLS_AVAILABLE = _features["REFLECTION_TOOLS_AVAILABLE"] 

173ENHANCED_SEARCH_AVAILABLE = _features["ENHANCED_SEARCH_AVAILABLE"] 

174UTILITY_FUNCTIONS_AVAILABLE = _features["UTILITY_FUNCTIONS_AVAILABLE"] 

175MULTI_PROJECT_AVAILABLE = _features["MULTI_PROJECT_AVAILABLE"] 

176ADVANCED_SEARCH_AVAILABLE = _features["ADVANCED_SEARCH_AVAILABLE"] 

177CONFIG_AVAILABLE = _features["CONFIG_AVAILABLE"] 

178AUTO_CONTEXT_AVAILABLE = _features["AUTO_CONTEXT_AVAILABLE"] 

179MEMORY_OPTIMIZER_AVAILABLE = _features["MEMORY_OPTIMIZER_AVAILABLE"] 

180APP_MONITOR_AVAILABLE = _features["APP_MONITOR_AVAILABLE"] 

181LLM_PROVIDERS_AVAILABLE = _features["LLM_PROVIDERS_AVAILABLE"] 

182SERVERLESS_MODE_AVAILABLE = _features["SERVERLESS_MODE_AVAILABLE"] 

183CRACKERJACK_INTEGRATION_AVAILABLE = _features["CRACKERJACK_INTEGRATION_AVAILABLE"] 

184 

185# Global feature instances (initialized on-demand) 

186multi_project_coordinator: Any = None 

187advanced_search_engine: Any = None 

188app_config: Any = None 

189current_project: str | None = None 

190permissions_manager: SessionPermissionsManager = None # type: ignore[assignment] 

191 

192 

193def _get_permissions_manager() -> SessionPermissionsManager: 

194 global permissions_manager 

195 

196 with suppress(Exception): 

197 manager = get_sync_typed(SessionPermissionsManager) 

198 if isinstance(manager, SessionPermissionsManager): 198 ↛ 202line 198 didn't jump to line 202

199 permissions_manager = manager 

200 return manager 

201 

202 from session_buddy.di.config import SessionPaths 

203 

204 with suppress(Exception): 

205 paths = get_sync_typed(SessionPaths) 

206 if isinstance(paths, SessionPaths): 

207 manager = SessionPermissionsManager(paths.claude_dir) 

208 depends.set(SessionPermissionsManager, manager) 

209 permissions_manager = manager 

210 return manager 

211 

212 paths = SessionPaths.from_home() 

213 paths.ensure_directories() 

214 manager = SessionPermissionsManager(paths.claude_dir) 

215 depends.set(SessionPermissionsManager, manager) 

216 permissions_manager = manager 

217 return manager 

218 

219 

220# Import required components for automatic lifecycle 

221from session_buddy.core import SessionLifecycleManager 

222from session_buddy.reflection_tools import get_reflection_database 

223 

224 

225# Token optimizer helpers (only used when available) 

226def _build_memory_optimization_policy( 

227 strategy: str, max_age_days: int 

228) -> dict[str, Any]: 

229 if strategy == "aggressive": 229 ↛ 230line 229 didn't jump to line 230 because the condition on line 229 was never true

230 importance_threshold = 0.3 

231 elif strategy == "conservative": 231 ↛ 232line 231 didn't jump to line 232 because the condition on line 231 was never true

232 importance_threshold = 0.7 

233 else: 

234 importance_threshold = 0.5 

235 

236 return { 

237 "consolidation_age_days": max_age_days, 

238 "importance_threshold": importance_threshold, 

239 } 

240 

241 

242def _format_memory_optimization_results(results: dict[str, Any], dry_run: bool) -> str: 

243 header = "🧠 Memory Optimization Results" 

244 if dry_run: 

245 header += " (DRY RUN)" 

246 

247 lines = [header] 

248 lines.append(f"Total Conversations: {results.get('total_conversations', 0)}") 

249 lines.append(f"Conversations to Keep: {results.get('conversations_to_keep', 0)}") 

250 lines.append( 

251 f"Conversations to Consolidate: {results.get('conversations_to_consolidate', 0)}" 

252 ) 

253 lines.append(f"Clusters Created: {results.get('clusters_created', 0)}") 

254 

255 saved = results.get("space_saved_estimate") 

256 if isinstance(saved, (int, float)): 

257 lines.append(f"{saved:,.0f} characters saved") 

258 

259 ratio = results.get("compression_ratio") 

260 if isinstance(ratio, (int, float)): 

261 lines.append(f"{ratio * 100:.1f}% compression ratio") 

262 

263 summaries: list[Any] = results.get("consolidated_summaries") or [] 

264 if summaries: 

265 first = summaries[0] 

266 if isinstance(first, dict) and "original_count" in first: 

267 lines.append(f"{first['original_count']} conversations → 1 summary") 

268 

269 if dry_run: 

270 lines.append("Run with dry_run=False to apply changes") 

271 

272 return "\n".join(lines) 

273 

274 

275if TOKEN_OPTIMIZER_AVAILABLE: 275 ↛ 304line 275 didn't jump to line 304 because the condition on line 275 was always true

276 

277 async def optimize_memory_usage( 

278 strategy: str = "auto", 

279 max_age_days: int = 30, 

280 dry_run: bool = True, 

281 ) -> str: 

282 if not REFLECTION_TOOLS_AVAILABLE or not MEMORY_OPTIMIZER_AVAILABLE: 282 ↛ 283line 282 didn't jump to line 283 because the condition on line 282 was never true

283 return "❌ Memory optimization requires both token optimizer and reflection tools" 

284 

285 try: 

286 db = await get_reflection_database() 

287 from session_buddy.memory_optimizer import MemoryOptimizer 

288 

289 policy = _build_memory_optimization_policy(strategy, max_age_days) 

290 # Type ignore: get_reflection_database returns ReflectionDatabaseAdapterOneiric 

291 # which is compatible with MemoryOptimizer's expected type 

292 optimizer = MemoryOptimizer(db) # type: ignore[arg-type] 

293 results = await optimizer.compress_memory(policy=policy, dry_run=dry_run) 

294 

295 if isinstance(results, dict) and "error" in results: 295 ↛ 298line 295 didn't jump to line 298 because the condition on line 295 was always true

296 return f"❌ Memory optimization error: {results['error']}" 

297 

298 return _format_memory_optimization_results(results, dry_run) 

299 except Exception as e: 

300 return f"❌ Error optimizing memory: {e}" 

301 

302 

303# Check mcp-common ServerPanels availability (Phase 3.3 M2: improved pattern) 

304SERVERPANELS_AVAILABLE = importlib.util.find_spec("mcp_common.ui") is not None 

305 

306# Check mcp-common security availability (Phase 3.3 M2: improved pattern) 

307SECURITY_AVAILABLE = importlib.util.find_spec("mcp_common.security") is not None 

308 

309# Check FastMCP rate limiting middleware availability (Phase 3.3 M2: improved pattern) 

310RATE_LIMITING_AVAILABLE = ( 

311 importlib.util.find_spec("fastmcp.server.middleware.rate_limiting") is not None 

312) 

313 

314# Phase 2.2: Import utility and formatting functions from server_helpers 

315 

316 

317def _get_lifecycle_manager() -> SessionLifecycleManager: 

318 with suppress(Exception): 

319 manager = get_sync_typed(SessionLifecycleManager) 

320 if isinstance(manager, SessionLifecycleManager): 

321 return manager 

322 manager = SessionLifecycleManager() 

323 depends.set(SessionLifecycleManager, manager) 

324 return manager 

325 

326 

327# Lifespan handler wrapper for FastMCP 

328@asynccontextmanager 

329async def session_lifecycle(app: Any) -> AsyncGenerator[None]: 

330 """Automatic session lifecycle for git repositories only (wrapper).""" 

331 registry = get_service_registry() 

332 await registry.init_all() 

333 lifecycle_manager = _get_lifecycle_manager() 

334 async with _session_lifecycle_impl(app, lifecycle_manager, _get_logger()): # type: ignore[arg-type] 

335 snapshot_manager = RuntimeSnapshotManager.for_server("session-buddy") 

336 pid = os.getpid() 

337 snapshot_manager.record("startup_events") 

338 snapshot_manager.write_health_snapshot(pid=pid, watchers_running=True) 

339 snapshot_manager.write_telemetry_snapshot(pid=pid) 

340 

341 interval_seconds = max(snapshot_manager.settings.health_ttl_seconds / 2, 5.0) 

342 snapshot_task = asyncio.create_task( 

343 run_snapshot_loop(snapshot_manager, pid, interval_seconds), 

344 ) 

345 

346 try: 

347 yield 

348 finally: 

349 snapshot_task.cancel() 

350 with suppress(asyncio.CancelledError): 

351 await snapshot_task 

352 snapshot_manager.record("shutdown_events") 

353 snapshot_manager.write_health_snapshot(pid=pid, watchers_running=False) 

354 snapshot_manager.write_telemetry_snapshot(pid=pid) 

355 await registry.cleanup_all() 

356 

357 

358# Load configuration and initialize FastMCP 2.0 server with lifespan 

359_mcp_config = _load_mcp_config() 

360 

361# Initialize MCP server with lifespan 

362mcp = FastMCP("session-buddy", lifespan=session_lifecycle) 

363 

364# Add rate limiting middleware (Phase 3 Security Hardening) 

365# NOTE: Disabled temporarily due to FastMCP bug where MiddlewareContext becomes unhashable 

366# when it contains CallToolRequestParams (which has __hash__ = None). 

367# See: https://github.com/jlowin/fastmcp/issues for tracking 

368# if RATE_LIMITING_AVAILABLE: 

369# from fastmcp.server.middleware.rate_limiting import RateLimitingMiddleware 

370# 

371# rate_limiter = RateLimitingMiddleware( 

372# max_requests_per_second=10.0, # Sustainable rate for session management operations 

373# burst_capacity=30, # Allow bursts for checkpoint/status operations 

374# global_limit=True, # Protect the session management server globally 

375# ) 

376# # Use public API (Phase 3.1 C1 fix: standardize middleware access) 

377# mcp.add_middleware(rate_limiter) 

378# _get_logger().info("Rate limiting enabled: 10 req/sec, burst 30") 

379 

380# Register extracted tool modules following crackerjack architecture patterns 

381# Import LLM provider validation (Phase 3 Security Hardening) 

382from session_buddy.config.feature_flags import get_feature_flags as get_rollout_flags 

383from session_buddy.memory.migration import ( 

384 migrate_v1_to_v2 as _migrate_v1_to_v2, 

385) 

386from session_buddy.memory.migration import ( 

387 needs_migration as _needs_migration, 

388) 

389 

390from .llm.security import validate_llm_api_keys_at_startup 

391from .tools import ( 

392 register_access_log_tools, 

393 register_conscious_agent_tools, 

394 register_crackerjack_tools, 

395 register_extraction_tools, 

396 register_feature_flags_tools, 

397 register_knowledge_graph_tools, 

398 register_llm_tools, 

399 register_migration_tools, 

400 register_monitoring_tools, 

401 register_prompt_tools, 

402 register_search_tools, 

403 register_serverless_tools, 

404 register_session_tools, 

405 register_team_tools, 

406) 

407 

408# Import utility functions 

409from .utils import ( 

410 _format_search_results, 

411 validate_claude_directory, 

412) 

413 

414# Register all extracted tool modules 

415register_access_log_tools(mcp) 

416register_conscious_agent_tools(mcp) 

417register_crackerjack_tools(mcp) 

418register_extraction_tools(mcp) 

419register_feature_flags_tools(mcp) 

420register_knowledge_graph_tools(mcp) # DuckPGQ knowledge graph tools 

421register_llm_tools(mcp) 

422register_migration_tools(mcp) 

423register_monitoring_tools(mcp) 

424register_prompt_tools(mcp) 

425register_search_tools(mcp) 

426register_serverless_tools(mcp) 

427register_session_tools(mcp) 

428register_team_tools(mcp) 

429 

430 

431# Add helper method for programmatic tool calling used in tests 

432async def _resolve_tool_registry(mcp_instance: Any) -> dict[str, Any]: 

433 """Return the registered tool mapping for an MCP instance.""" 

434 if hasattr(mcp_instance, "get_tools"): 

435 return await mcp_instance.get_tools() # type: ignore[no-any-return] 

436 if hasattr(mcp_instance, "tools"): 

437 return mcp_instance.tools # type: ignore[no-any-return] 

438 return getattr(mcp_instance, "_tools", {}) 

439 

440 

441def _resolve_tool_callable(tool_spec: Any, tool_name: str) -> Any: 

442 """Extract the callable implementation from a tool spec.""" 

443 if hasattr(tool_spec, "function"): 

444 return tool_spec.function 

445 if isinstance(tool_spec, dict) and "function" in tool_spec: 

446 return tool_spec["function"] 

447 if callable(tool_spec): 

448 return tool_spec 

449 

450 candidate = getattr(tool_spec, "implementation", None) or getattr( 

451 tool_spec, "handler", None 

452 ) 

453 if candidate is None: 

454 msg = f"Could not extract callable function from tool {tool_name}" 

455 raise ValueError(msg) 

456 return candidate 

457 

458 

459def _build_tool_arguments( 

460 tool_func: Any, provided_args: dict[str, Any] 

461) -> dict[str, Any]: 

462 """Filter provided args to match the callable signature.""" 

463 sig = inspect.signature(tool_func) 

464 filtered_args: dict[str, Any] = {} 

465 for param_name, param in sig.parameters.items(): 

466 if param_name in provided_args: 

467 filtered_args[param_name] = provided_args[param_name] 

468 elif param.default is not param.empty: 

469 filtered_args[param_name] = param.default 

470 return filtered_args 

471 

472 

473async def _call_registered_tool( 

474 mcp_instance: Any, tool_name: str, arguments: dict[str, Any] | None = None 

475) -> Any: 

476 """Programmatically call a tool by name with provided arguments.""" 

477 provided_args = arguments or {} 

478 tools = await _resolve_tool_registry(mcp_instance) 

479 

480 if tool_name not in tools: 

481 msg = f"Tool '{tool_name}' is not registered" 

482 raise ValueError(msg) 

483 

484 tool_func = _resolve_tool_callable(tools[tool_name], tool_name) 

485 filtered_args = _build_tool_arguments(tool_func, provided_args) 

486 

487 if inspect.iscoroutinefunction(tool_func): 

488 return await tool_func(**filtered_args) 

489 return tool_func(**filtered_args) 

490 

491 

492# Attach the method to the mcp instance as a bound method 

493async def _call_tool_bound( 

494 tool_name: str, arguments: dict[str, Any] | None = None 

495) -> Any: 

496 """Bound _call_tool method for the mcp instance.""" 

497 return await _call_registered_tool(mcp, tool_name, arguments) 

498 

499 

500# CRITICAL: DO NOT override mcp._call_tool - it breaks FastMCP's internal middleware handling! 

501# FastMCP's _call_tool expects MiddlewareContext, our function expects string tool_name 

502# mcp._call_tool = _call_tool_bound # DISABLED - causes "Tool 'MiddlewareContext(...)' is not registered" 

503 

504 

505async def reflect_on_past( 

506 query: str, 

507 limit: int = 5, 

508 min_score: float = 0.7, 

509 project: str | None = None, 

510 optimize_tokens: bool = True, 

511 max_tokens: int = 4000, 

512) -> str: 

513 """Search past conversations with optional token optimization.""" 

514 # Check if reflection tools are available 

515 if not REFLECTION_TOOLS_AVAILABLE: 515 ↛ 516line 515 didn't jump to line 516 because the condition on line 515 was never true

516 return "❌ Reflection tools not available. Install dependencies: uv sync --extra embeddings" 

517 

518 # Initialize database 

519 db = await _initialize_reflection_database() 

520 if not db: 520 ↛ 521line 520 didn't jump to line 521 because the condition on line 520 was never true

521 return "❌ Reflection system not available. Install optional dependencies with `uv sync --extra embeddings`" 

522 

523 # Search conversations 

524 results = await _search_conversations(db, query, project, limit, min_score) 

525 if isinstance(results, str): # Error occurred 525 ↛ 526line 525 didn't jump to line 526 because the condition on line 525 was never true

526 return results 

527 

528 # Optimize tokens if requested 

529 optimization_info = {} 

530 if optimize_tokens and TOKEN_OPTIMIZER_AVAILABLE: 

531 results, optimization_info = await _optimize_results(results, max_tokens) 

532 

533 # Format and return output 

534 return _format_reflection_output(query, results, optimization_info) 

535 

536 

537async def _initialize_reflection_database() -> Any | None: 

538 """Initialize the reflection database with error handling.""" 

539 try: 

540 return await get_reflection_database() 

541 except Exception as exc: # pragma: no cover - defensive logging 

542 _get_logger().exception( 

543 "Failed to initialize reflection database", 

544 exc_info=exc, 

545 ) 

546 return None 

547 

548 

549async def _search_conversations( 

550 db: Any, 

551 query: str, 

552 project: str | None, 

553 limit: int, 

554 min_score: float, 

555) -> Any | str: 

556 """Search conversations and handle errors.""" 

557 try: 

558 async with db: 

559 return await db.search_conversations( 

560 query=query, 

561 project=project or current_project, 

562 limit=limit, 

563 min_score=min_score, 

564 ) 

565 except Exception as exc: 

566 _get_logger().exception("Reflection search failed", extra={"query": query}) 

567 return f"❌ Error searching conversations: {exc}" 

568 

569 

570async def _optimize_results( 

571 results: list[dict[str, Any]], 

572 max_tokens: int, 

573) -> tuple[list[dict[str, Any]], dict[str, Any]]: 

574 """Optimize results with token optimization.""" 

575 try: 

576 optimized_results, optimization_info = await optimize_search_response( 

577 results, 

578 strategy="prioritize_recent", 

579 max_tokens=max_tokens, 

580 ) 

581 if optimized_results: 

582 results = optimized_results 

583 

584 token_savings = optimization_info.get("token_savings", {}) 

585 await track_token_usage( 

586 operation="reflect_on_past", 

587 request_tokens=max_tokens, 

588 response_tokens=max_tokens - token_savings.get("tokens_saved", 0), 

589 optimization_applied=optimization_info.get("strategy"), 

590 ) 

591 return results, optimization_info 

592 except Exception as exc: 

593 _get_logger().warning( 

594 "Token optimization failed for reflect_on_past", 

595 extra={"error": str(exc)}, 

596 ) 

597 return results, {} 

598 

599 

600def _format_reflection_output( 

601 query: str, 

602 results: list[dict[str, Any]], 

603 optimization_info: dict[str, Any], 

604) -> str: 

605 """Format the reflection output.""" 

606 if not results: 

607 return ( 

608 f"🔍 No relevant conversations found for query: '{query}'\n" 

609 "💡 Try adjusting the search terms or lowering min_score." 

610 ) 

611 

612 output_lines = [ 

613 f"🔍 **Search Results for: '{query}'**", 

614 "", 

615 f"📊 Found {len(results)} relevant conversations", 

616 "", 

617 ] 

618 

619 token_savings = ( 

620 optimization_info.get("token_savings") 

621 if isinstance(optimization_info, dict) 

622 else None 

623 ) 

624 if token_savings and token_savings.get("savings_percentage") is not None: 

625 output_lines.extend( 

626 ( 

627 f"⚡ Token optimization: {token_savings.get('savings_percentage')}% saved", 

628 "", 

629 ) 

630 ) 

631 

632 output_lines.extend(_format_search_results(results)) 

633 return "\n".join(output_lines) 

634 

635 

636# Wrapper for initialize_new_features that manages global state 

637async def initialize_new_features() -> None: 

638 """Initialize multi-project coordination and advanced search features (wrapper).""" 

639 global multi_project_coordinator, advanced_search_engine, app_config 

640 

641 # Get the initialized instances from the implementation 

642 ( 

643 multi_project_coordinator, 

644 advanced_search_engine, 

645 app_config, 

646 ) = await _initialize_new_features_impl( 

647 _get_logger(), # type: ignore[arg-type] 

648 multi_project_coordinator, 

649 advanced_search_engine, 

650 app_config, 

651 ) 

652 

653 

654# Phase 2.3: Import quality engine functions 

655from session_buddy.quality_engine import ( 

656 calculate_quality_score as _calculate_quality_score_impl, 

657) 

658 

659 

660# Expose quality scoring function for external use 

661async def calculate_quality_score(project_dir: Path | None = None) -> dict[str, Any]: 

662 """Calculate session quality score using V2 algorithm. 

663 

664 This function provides a consistent interface for calculating quality scores 

665 across the system. 

666 

667 Args: 

668 project_dir: Path to the project directory. If not provided, will use current directory. 

669 

670 Returns: 

671 Dict with quality score and breakdown information. 

672 

673 """ 

674 if project_dir is None: 

675 project_dir = Path(os.environ.get("PWD", Path.cwd())) 

676 

677 return await _calculate_quality_score_impl(project_dir=project_dir) 

678 

679 

680# Wrapper for health_check that provides required parameters 

681async def health_check() -> dict[str, Any]: 

682 """Comprehensive health check for MCP server and toolkit availability (wrapper).""" 

683 return await _health_check_impl( 

684 _get_logger(), # type: ignore[arg-type] 

685 _get_permissions_manager(), 

686 validate_claude_directory, 

687 ) 

688 

689 

690# Phase 2.4: Import advanced feature tools from advanced_features module 

691from session_buddy.advanced_features import ( 

692 add_project_dependency, 

693 # Advanced Search Tools (3 MCP tools) 

694 advanced_search, 

695 cancel_user_reminder, 

696 # Natural Language Scheduling Tools (5 MCP tools) 

697 create_natural_reminder, 

698 # Multi-Project Coordination Tools (4 MCP tools) 

699 create_project_group, 

700 # Interruption Management Tools (1 MCP tool) 

701 get_interruption_statistics, 

702 get_project_insights, 

703 get_search_metrics, 

704 # Git Worktree Management Tools (3 MCP tools) 

705 git_worktree_add, 

706 git_worktree_remove, 

707 git_worktree_switch, 

708 list_user_reminders, 

709 search_across_projects, 

710 search_suggestions, 

711 # Session Welcome Tool (1 MCP tool) 

712 session_welcome, 

713 start_reminder_service, 

714 stop_reminder_service, 

715) 

716 

717# Register all 17 advanced MCP tools 

718mcp.tool()(create_natural_reminder) 

719mcp.tool()(list_user_reminders) 

720mcp.tool()(cancel_user_reminder) 

721mcp.tool()(start_reminder_service) 

722mcp.tool()(stop_reminder_service) 

723mcp.tool()(get_interruption_statistics) 

724mcp.tool()(create_project_group) 

725mcp.tool()(add_project_dependency) 

726mcp.tool()(search_across_projects) 

727mcp.tool()(get_project_insights) 

728mcp.tool()(advanced_search) 

729mcp.tool()(search_suggestions) 

730mcp.tool()(get_search_metrics) 

731mcp.tool()(git_worktree_add) 

732mcp.tool()(git_worktree_remove) 

733mcp.tool()(git_worktree_switch) 

734mcp.tool()(session_welcome) 

735 

736 

737def _perform_startup_validation() -> None: 

738 """Perform startup validation checks (LLM API keys).""" 

739 if not LLM_PROVIDERS_AVAILABLE: 

740 return 

741 

742 try: 

743 validate_llm_api_keys_at_startup() 

744 except (ImportError, ValueError) as e: 

745 _get_logger().warning( 

746 f"LLM API key validation skipped (optional feature): {e}", 

747 ) 

748 except Exception: 

749 _get_logger().exception("Unexpected error during LLM validation") 

750 

751 

752def _initialize_features() -> None: 

753 """Initialize optional features on startup.""" 

754 try: 

755 # Optionally run auto-migration when v2 is enabled via rollout flags 

756 try: 

757 flags = get_rollout_flags() 

758 if flags.use_schema_v2 and _needs_migration(): 

759 _get_logger().info("Auto-migration: v1 detected, migrating to v2...") 

760 res = _migrate_v1_to_v2() 

761 if res.success: 

762 _get_logger().info("Migration complete", extra={"stats": res.stats}) 

763 else: 

764 _get_logger().warning( 

765 "Migration failed; continuing with legacy schema", 

766 extra={"error": res.error}, 

767 ) 

768 except Exception as e: 

769 _get_logger().warning(f"Migration check error (optional): {e}") 

770 

771 asyncio.run(initialize_new_features()) 

772 except (ImportError, RuntimeError) as e: 

773 _get_logger().warning(f"Feature initialization skipped (optional): {e}") 

774 except Exception: 

775 _get_logger().exception("Unexpected error during feature init") 

776 

777 

778def _build_feature_list() -> list[str]: 

779 """Build list of available features for display.""" 

780 features = [ 

781 "Session Lifecycle Management", 

782 "Memory & Reflection System", 

783 "Crackerjack Quality Integration", 

784 "Knowledge Graph (DuckPGQ)", 

785 "LLM Provider Management", 

786 ] 

787 if SECURITY_AVAILABLE: 787 ↛ 789line 787 didn't jump to line 789 because the condition on line 787 was always true

788 features.append("🔒 API Key Validation (OpenAI/Gemini)") 

789 if RATE_LIMITING_AVAILABLE: 789 ↛ 791line 789 didn't jump to line 791 because the condition on line 789 was always true

790 features.append("⚡ Rate Limiting (10 req/sec, burst 30)") 

791 return features 

792 

793 

794def _display_http_startup(host: str, port: int, features: list[str]) -> None: 

795 """Display HTTP mode startup information.""" 

796 if SERVERPANELS_AVAILABLE: 

797 from mcp_common.ui import ServerPanels 

798 

799 ServerPanels.startup_success( 

800 server_name="Session Management MCP", 

801 version="2.0.0", 

802 features=features, 

803 endpoint=f"http://{host}:{port}/mcp", 

804 websocket_monitor=str(_mcp_config.get("websocket_monitor_port", 8677)), 

805 transport="HTTP (streamable)", 

806 ) 

807 else: 

808 # Fallback to simple print when ServerPanels not available 

809 print("✅ Session Management MCP v2.0.0", file=sys.stderr) 

810 print(f"🔗 Endpoint: http://{host}:{port}/mcp", file=sys.stderr) 

811 print("📡 Transport: HTTP (streamable)", file=sys.stderr) 

812 if features: 812 ↛ exitline 812 didn't return from function '_display_http_startup' because the condition on line 812 was always true

813 print(f"🎯 Features: {', '.join(features)}", file=sys.stderr) 

814 

815 

816def _display_stdio_startup(features: list[str]) -> None: 

817 """Display STDIO mode startup information.""" 

818 if SERVERPANELS_AVAILABLE: 

819 from mcp_common.ui import ServerPanels 

820 

821 ServerPanels.startup_success( 

822 server_name="Session Management MCP", 

823 version="2.0.0", 

824 features=features, 

825 transport="STDIO", 

826 mode="Claude Desktop", 

827 ) 

828 else: 

829 # Fallback to simple print when ServerPanels not available 

830 print("✅ Session Management MCP v2.0.0", file=sys.stderr) 

831 print("📡 Transport: STDIO (Claude Desktop)", file=sys.stderr) 

832 if features: 832 ↛ exitline 832 didn't return from function '_display_stdio_startup' because the condition on line 832 was always true

833 print(f"🎯 Features: {', '.join(features)}", file=sys.stderr) 

834 

835 

836def main(http_mode: bool = False, http_port: int | None = None) -> None: 

837 """Main entry point for the MCP server.""" 

838 # Perform startup validation and initialization 

839 _perform_startup_validation() 

840 _initialize_features() 

841 

842 # Get configuration 

843 host = _mcp_config.get("http_host", "127.0.0.1") 

844 port = http_port or _mcp_config.get("http_port", 8678) 

845 use_http = http_mode or _mcp_config.get("http_enabled", False) 

846 

847 # Build feature list for display 

848 features = _build_feature_list() 

849 

850 # Display startup information and run server 

851 if use_http: 

852 _display_http_startup(host, port, features) 

853 mcp.run( 

854 transport="streamable-http", 

855 host=host, 

856 port=port, 

857 path="/mcp", 

858 stateless_http=True, 

859 show_banner=False, # Disable Rich banner to avoid BlockingIOError 

860 ) 

861 else: 

862 _display_stdio_startup(features) 

863 mcp.run(show_banner=False) 

864 

865 

866def _ensure_default_recommendations(priority_actions: list[str]) -> list[str]: 

867 """Ensure we always have default recommendations available.""" 

868 if not priority_actions: 

869 return [ 

870 "Run quality checks with `crackerjack lint`", 

871 "Check test coverage with `pytest --cov`", 

872 "Review recent git commits for patterns", 

873 ] 

874 return priority_actions 

875 

876 

877def _has_statistics_data( 

878 sessions: list[dict[str, Any]], 

879 interruptions: list[dict[str, Any]], 

880 snapshots: list[dict[str, Any]], 

881) -> bool: 

882 """Check if we have any statistics data to display.""" 

883 return bool(sessions or interruptions or snapshots) 

884 

885 

886def _parse_http_args(argv: list[str]) -> tuple[bool, int | None]: 

887 """Parse HTTP mode and port from argv.""" 

888 http_mode = "--http" in argv 

889 http_port = None 

890 

891 if "--http-port" in argv: 

892 port_idx = argv.index("--http-port") 

893 if port_idx + 1 < len(argv): 

894 http_port = int(argv[port_idx + 1]) 

895 

896 return http_mode, http_port 

897 

898 

899# Export ASGI app for uvicorn (standardized startup pattern) 

900http_app = mcp.http_app 

901 

902 

903if __name__ == "__main__": 

904 import sys 

905 

906 # Check for HTTP mode flags 

907 http_mode, http_port = _parse_http_args(sys.argv) 

908 main(http_mode, http_port)