Coverage for session_buddy / server.py: 51.94%
387 statements
« prev ^ index » next coverage.py v7.13.1, created at 2026-01-04 00:43 -0800
« prev ^ index » next coverage.py v7.13.1, created at 2026-01-04 00:43 -0800
1#!/usr/bin/env python3
2"""Claude Session Management MCP Server - FastMCP Version.
4A dedicated MCP server that provides session management functionality
5including initialization, checkpoints, and cleanup across all projects.
7This server can be included in any project's .mcp.json file to provide
8automatic access to /session-init, /session-checkpoint,
9and /session-end slash commands.
10"""
12from __future__ import annotations
14import asyncio
15import importlib.util
16import inspect
17import os
18import sys
19import warnings
20from contextlib import asynccontextmanager, suppress
21from pathlib import Path
22from typing import TYPE_CHECKING, Any
24if TYPE_CHECKING:
25 import logging
26 from collections.abc import AsyncGenerator
28 from mcp_common.exceptions import DependencyMissingError
30# Suppress transformers warnings about PyTorch/TensorFlow for cleaner CLI output
31os.environ["TRANSFORMERS_VERBOSITY"] = "error"
32warnings.filterwarnings("ignore", message=".*PyTorch.*TensorFlow.*Flax.*")
34# DEBUG: Patch CallToolRequestParams to be hashable and log when hash is called
35# This helps us identify where the unhashable type error is coming from
36from pathlib import Path as _PatchPath
38_patch_file = _PatchPath(__file__).parent.parent / "patch_hashable.py"
39if _patch_file.exists(): 39 ↛ 40line 39 didn't jump to line 40 because the condition on line 39 was never true
40 import importlib.util as _util
42 spec = _util.spec_from_file_location("patch_hashable", _patch_file)
43 if spec and spec.loader:
44 patch_module = _util.module_from_spec(spec)
45 sys.modules["patch_hashable"] = patch_module
46 spec.loader.exec_module(patch_module)
48# Phase 2.5: Import core infrastructure from server_core
49from session_buddy.core.features import get_feature_flags
50from session_buddy.core.lifecycle.service_registry import get_service_registry
51from session_buddy.core.permissions import SessionPermissionsManager
52from session_buddy.di import get_sync_typed
53from session_buddy.di.container import depends
54from session_buddy.server_core import _load_mcp_config
55from session_buddy.server_core import (
56 # Health & status functions
57 health_check as _health_check_impl,
58)
59from session_buddy.server_core import (
60 initialize_new_features as _initialize_new_features_impl,
61)
62from session_buddy.server_core import (
63 # Session lifecycle handler
64 session_lifecycle as _session_lifecycle_impl,
65)
66from session_buddy.utils.runtime_snapshots import (
67 RuntimeSnapshotManager,
68 run_snapshot_loop,
69)
72# Get logger using standard logging (avoid DI type resolution conflicts)
73def _get_session_logger() -> logging.Logger:
74 """Get logger using standard logging module."""
75 import logging
77 return logging.getLogger(__name__)
80# Initialize global session_logger as None to prevent undefined variable
81session_logger: logging.Logger | None = None
84def _get_logger() -> logging.Logger:
85 """Get logger instance with lazy initialization."""
86 global session_logger
87 if session_logger is None:
88 session_logger = _get_session_logger()
89 # session_logger is guaranteed to be Logger here
90 assert session_logger is not None
91 return session_logger
94# Check mcp-common exceptions availability (must be defined early for FastMCP import)
95EXCEPTIONS_AVAILABLE = importlib.util.find_spec("mcp_common.exceptions") is not None
97if EXCEPTIONS_AVAILABLE: 97 ↛ 101line 97 didn't jump to line 101 because the condition on line 97 was always true
98 from mcp_common.exceptions import DependencyMissingError
100# Check token optimizer availability (Phase 3.3 M2: improved pattern)
101TOKEN_OPTIMIZER_AVAILABLE = (
102 importlib.util.find_spec("session_buddy.token_optimizer") is not None
103)
105if TOKEN_OPTIMIZER_AVAILABLE: 105 ↛ 114line 105 didn't jump to line 114 because the condition on line 105 was always true
106 from session_buddy.token_optimizer import (
107 get_cached_chunk,
108 get_token_usage_stats,
109 optimize_search_response,
110 track_token_usage,
111 )
112else:
113 # Fallback implementations when token optimizer unavailable
114 TOKEN_OPTIMIZER_AVAILABLE = False
116 async def optimize_search_response(
117 results: list[dict[str, Any]],
118 strategy: str = "prioritize_recent",
119 max_tokens: int = 4000,
120 ) -> tuple[list[dict[str, Any]], dict[str, Any]]:
121 return results, {}
123 async def track_token_usage(
124 operation: str,
125 request_tokens: int,
126 response_tokens: int,
127 optimization_applied: str | None = None,
128 ) -> None:
129 return None
131 async def get_cached_chunk(
132 cache_key: str,
133 chunk_index: int,
134 ) -> dict[str, Any] | None:
135 return None
137 async def get_token_usage_stats(hours: int = 24) -> dict[str, Any]:
138 return {"status": "token optimizer unavailable"}
140 async def optimize_memory_usage(
141 strategy: str = "auto",
142 max_age_days: int = 30,
143 dry_run: bool = True,
144 ) -> str:
145 return "❌ Token optimizer not available"
148# Import FastMCP with test environment fallback
149try:
150 from fastmcp import FastMCP
152 MCP_AVAILABLE = True
153except ImportError:
154 if "pytest" in sys.modules or "test" in sys.argv[0].lower():
155 from tests.conftest import MockFastMCP
157 FastMCP = MockFastMCP # type: ignore[no-redef,misc]
158 MCP_AVAILABLE = False
159 elif EXCEPTIONS_AVAILABLE:
160 raise DependencyMissingError(
161 message="FastMCP is required but not installed",
162 dependency="fastmcp",
163 install_command="uv add fastmcp",
164 )
165 else:
166 # Fallback to sys.exit if exceptions unavailable
167 sys.exit(1)
169# Phase 2.6: Get all feature flags from centralized detector
170_features = get_feature_flags()
171SESSION_MANAGEMENT_AVAILABLE = _features["SESSION_MANAGEMENT_AVAILABLE"]
172REFLECTION_TOOLS_AVAILABLE = _features["REFLECTION_TOOLS_AVAILABLE"]
173ENHANCED_SEARCH_AVAILABLE = _features["ENHANCED_SEARCH_AVAILABLE"]
174UTILITY_FUNCTIONS_AVAILABLE = _features["UTILITY_FUNCTIONS_AVAILABLE"]
175MULTI_PROJECT_AVAILABLE = _features["MULTI_PROJECT_AVAILABLE"]
176ADVANCED_SEARCH_AVAILABLE = _features["ADVANCED_SEARCH_AVAILABLE"]
177CONFIG_AVAILABLE = _features["CONFIG_AVAILABLE"]
178AUTO_CONTEXT_AVAILABLE = _features["AUTO_CONTEXT_AVAILABLE"]
179MEMORY_OPTIMIZER_AVAILABLE = _features["MEMORY_OPTIMIZER_AVAILABLE"]
180APP_MONITOR_AVAILABLE = _features["APP_MONITOR_AVAILABLE"]
181LLM_PROVIDERS_AVAILABLE = _features["LLM_PROVIDERS_AVAILABLE"]
182SERVERLESS_MODE_AVAILABLE = _features["SERVERLESS_MODE_AVAILABLE"]
183CRACKERJACK_INTEGRATION_AVAILABLE = _features["CRACKERJACK_INTEGRATION_AVAILABLE"]
185# Global feature instances (initialized on-demand)
186multi_project_coordinator: Any = None
187advanced_search_engine: Any = None
188app_config: Any = None
189current_project: str | None = None
190permissions_manager: SessionPermissionsManager = None # type: ignore[assignment]
193def _get_permissions_manager() -> SessionPermissionsManager:
194 global permissions_manager
196 with suppress(Exception):
197 manager = get_sync_typed(SessionPermissionsManager)
198 if isinstance(manager, SessionPermissionsManager): 198 ↛ 202line 198 didn't jump to line 202
199 permissions_manager = manager
200 return manager
202 from session_buddy.di.config import SessionPaths
204 with suppress(Exception):
205 paths = get_sync_typed(SessionPaths)
206 if isinstance(paths, SessionPaths):
207 manager = SessionPermissionsManager(paths.claude_dir)
208 depends.set(SessionPermissionsManager, manager)
209 permissions_manager = manager
210 return manager
212 paths = SessionPaths.from_home()
213 paths.ensure_directories()
214 manager = SessionPermissionsManager(paths.claude_dir)
215 depends.set(SessionPermissionsManager, manager)
216 permissions_manager = manager
217 return manager
220# Import required components for automatic lifecycle
221from session_buddy.core import SessionLifecycleManager
222from session_buddy.reflection_tools import get_reflection_database
225# Token optimizer helpers (only used when available)
226def _build_memory_optimization_policy(
227 strategy: str, max_age_days: int
228) -> dict[str, Any]:
229 if strategy == "aggressive": 229 ↛ 230line 229 didn't jump to line 230 because the condition on line 229 was never true
230 importance_threshold = 0.3
231 elif strategy == "conservative": 231 ↛ 232line 231 didn't jump to line 232 because the condition on line 231 was never true
232 importance_threshold = 0.7
233 else:
234 importance_threshold = 0.5
236 return {
237 "consolidation_age_days": max_age_days,
238 "importance_threshold": importance_threshold,
239 }
242def _format_memory_optimization_results(results: dict[str, Any], dry_run: bool) -> str:
243 header = "🧠 Memory Optimization Results"
244 if dry_run:
245 header += " (DRY RUN)"
247 lines = [header]
248 lines.append(f"Total Conversations: {results.get('total_conversations', 0)}")
249 lines.append(f"Conversations to Keep: {results.get('conversations_to_keep', 0)}")
250 lines.append(
251 f"Conversations to Consolidate: {results.get('conversations_to_consolidate', 0)}"
252 )
253 lines.append(f"Clusters Created: {results.get('clusters_created', 0)}")
255 saved = results.get("space_saved_estimate")
256 if isinstance(saved, (int, float)):
257 lines.append(f"{saved:,.0f} characters saved")
259 ratio = results.get("compression_ratio")
260 if isinstance(ratio, (int, float)):
261 lines.append(f"{ratio * 100:.1f}% compression ratio")
263 summaries: list[Any] = results.get("consolidated_summaries") or []
264 if summaries:
265 first = summaries[0]
266 if isinstance(first, dict) and "original_count" in first:
267 lines.append(f"{first['original_count']} conversations → 1 summary")
269 if dry_run:
270 lines.append("Run with dry_run=False to apply changes")
272 return "\n".join(lines)
275if TOKEN_OPTIMIZER_AVAILABLE: 275 ↛ 304line 275 didn't jump to line 304 because the condition on line 275 was always true
277 async def optimize_memory_usage(
278 strategy: str = "auto",
279 max_age_days: int = 30,
280 dry_run: bool = True,
281 ) -> str:
282 if not REFLECTION_TOOLS_AVAILABLE or not MEMORY_OPTIMIZER_AVAILABLE: 282 ↛ 283line 282 didn't jump to line 283 because the condition on line 282 was never true
283 return "❌ Memory optimization requires both token optimizer and reflection tools"
285 try:
286 db = await get_reflection_database()
287 from session_buddy.memory_optimizer import MemoryOptimizer
289 policy = _build_memory_optimization_policy(strategy, max_age_days)
290 # Type ignore: get_reflection_database returns ReflectionDatabaseAdapterOneiric
291 # which is compatible with MemoryOptimizer's expected type
292 optimizer = MemoryOptimizer(db) # type: ignore[arg-type]
293 results = await optimizer.compress_memory(policy=policy, dry_run=dry_run)
295 if isinstance(results, dict) and "error" in results: 295 ↛ 298line 295 didn't jump to line 298 because the condition on line 295 was always true
296 return f"❌ Memory optimization error: {results['error']}"
298 return _format_memory_optimization_results(results, dry_run)
299 except Exception as e:
300 return f"❌ Error optimizing memory: {e}"
303# Check mcp-common ServerPanels availability (Phase 3.3 M2: improved pattern)
304SERVERPANELS_AVAILABLE = importlib.util.find_spec("mcp_common.ui") is not None
306# Check mcp-common security availability (Phase 3.3 M2: improved pattern)
307SECURITY_AVAILABLE = importlib.util.find_spec("mcp_common.security") is not None
309# Check FastMCP rate limiting middleware availability (Phase 3.3 M2: improved pattern)
310RATE_LIMITING_AVAILABLE = (
311 importlib.util.find_spec("fastmcp.server.middleware.rate_limiting") is not None
312)
314# Phase 2.2: Import utility and formatting functions from server_helpers
317def _get_lifecycle_manager() -> SessionLifecycleManager:
318 with suppress(Exception):
319 manager = get_sync_typed(SessionLifecycleManager)
320 if isinstance(manager, SessionLifecycleManager):
321 return manager
322 manager = SessionLifecycleManager()
323 depends.set(SessionLifecycleManager, manager)
324 return manager
327# Lifespan handler wrapper for FastMCP
328@asynccontextmanager
329async def session_lifecycle(app: Any) -> AsyncGenerator[None]:
330 """Automatic session lifecycle for git repositories only (wrapper)."""
331 registry = get_service_registry()
332 await registry.init_all()
333 lifecycle_manager = _get_lifecycle_manager()
334 async with _session_lifecycle_impl(app, lifecycle_manager, _get_logger()): # type: ignore[arg-type]
335 snapshot_manager = RuntimeSnapshotManager.for_server("session-buddy")
336 pid = os.getpid()
337 snapshot_manager.record("startup_events")
338 snapshot_manager.write_health_snapshot(pid=pid, watchers_running=True)
339 snapshot_manager.write_telemetry_snapshot(pid=pid)
341 interval_seconds = max(snapshot_manager.settings.health_ttl_seconds / 2, 5.0)
342 snapshot_task = asyncio.create_task(
343 run_snapshot_loop(snapshot_manager, pid, interval_seconds),
344 )
346 try:
347 yield
348 finally:
349 snapshot_task.cancel()
350 with suppress(asyncio.CancelledError):
351 await snapshot_task
352 snapshot_manager.record("shutdown_events")
353 snapshot_manager.write_health_snapshot(pid=pid, watchers_running=False)
354 snapshot_manager.write_telemetry_snapshot(pid=pid)
355 await registry.cleanup_all()
358# Load configuration and initialize FastMCP 2.0 server with lifespan
359_mcp_config = _load_mcp_config()
361# Initialize MCP server with lifespan
362mcp = FastMCP("session-buddy", lifespan=session_lifecycle)
364# Add rate limiting middleware (Phase 3 Security Hardening)
365# NOTE: Disabled temporarily due to FastMCP bug where MiddlewareContext becomes unhashable
366# when it contains CallToolRequestParams (which has __hash__ = None).
367# See: https://github.com/jlowin/fastmcp/issues for tracking
368# if RATE_LIMITING_AVAILABLE:
369# from fastmcp.server.middleware.rate_limiting import RateLimitingMiddleware
370#
371# rate_limiter = RateLimitingMiddleware(
372# max_requests_per_second=10.0, # Sustainable rate for session management operations
373# burst_capacity=30, # Allow bursts for checkpoint/status operations
374# global_limit=True, # Protect the session management server globally
375# )
376# # Use public API (Phase 3.1 C1 fix: standardize middleware access)
377# mcp.add_middleware(rate_limiter)
378# _get_logger().info("Rate limiting enabled: 10 req/sec, burst 30")
380# Register extracted tool modules following crackerjack architecture patterns
381# Import LLM provider validation (Phase 3 Security Hardening)
382from session_buddy.config.feature_flags import get_feature_flags as get_rollout_flags
383from session_buddy.memory.migration import (
384 migrate_v1_to_v2 as _migrate_v1_to_v2,
385)
386from session_buddy.memory.migration import (
387 needs_migration as _needs_migration,
388)
390from .llm.security import validate_llm_api_keys_at_startup
391from .tools import (
392 register_access_log_tools,
393 register_conscious_agent_tools,
394 register_crackerjack_tools,
395 register_extraction_tools,
396 register_feature_flags_tools,
397 register_knowledge_graph_tools,
398 register_llm_tools,
399 register_migration_tools,
400 register_monitoring_tools,
401 register_prompt_tools,
402 register_search_tools,
403 register_serverless_tools,
404 register_session_tools,
405 register_team_tools,
406)
408# Import utility functions
409from .utils import (
410 _format_search_results,
411 validate_claude_directory,
412)
414# Register all extracted tool modules
415register_access_log_tools(mcp)
416register_conscious_agent_tools(mcp)
417register_crackerjack_tools(mcp)
418register_extraction_tools(mcp)
419register_feature_flags_tools(mcp)
420register_knowledge_graph_tools(mcp) # DuckPGQ knowledge graph tools
421register_llm_tools(mcp)
422register_migration_tools(mcp)
423register_monitoring_tools(mcp)
424register_prompt_tools(mcp)
425register_search_tools(mcp)
426register_serverless_tools(mcp)
427register_session_tools(mcp)
428register_team_tools(mcp)
431# Add helper method for programmatic tool calling used in tests
432async def _resolve_tool_registry(mcp_instance: Any) -> dict[str, Any]:
433 """Return the registered tool mapping for an MCP instance."""
434 if hasattr(mcp_instance, "get_tools"):
435 return await mcp_instance.get_tools() # type: ignore[no-any-return]
436 if hasattr(mcp_instance, "tools"):
437 return mcp_instance.tools # type: ignore[no-any-return]
438 return getattr(mcp_instance, "_tools", {})
441def _resolve_tool_callable(tool_spec: Any, tool_name: str) -> Any:
442 """Extract the callable implementation from a tool spec."""
443 if hasattr(tool_spec, "function"):
444 return tool_spec.function
445 if isinstance(tool_spec, dict) and "function" in tool_spec:
446 return tool_spec["function"]
447 if callable(tool_spec):
448 return tool_spec
450 candidate = getattr(tool_spec, "implementation", None) or getattr(
451 tool_spec, "handler", None
452 )
453 if candidate is None:
454 msg = f"Could not extract callable function from tool {tool_name}"
455 raise ValueError(msg)
456 return candidate
459def _build_tool_arguments(
460 tool_func: Any, provided_args: dict[str, Any]
461) -> dict[str, Any]:
462 """Filter provided args to match the callable signature."""
463 sig = inspect.signature(tool_func)
464 filtered_args: dict[str, Any] = {}
465 for param_name, param in sig.parameters.items():
466 if param_name in provided_args:
467 filtered_args[param_name] = provided_args[param_name]
468 elif param.default is not param.empty:
469 filtered_args[param_name] = param.default
470 return filtered_args
473async def _call_registered_tool(
474 mcp_instance: Any, tool_name: str, arguments: dict[str, Any] | None = None
475) -> Any:
476 """Programmatically call a tool by name with provided arguments."""
477 provided_args = arguments or {}
478 tools = await _resolve_tool_registry(mcp_instance)
480 if tool_name not in tools:
481 msg = f"Tool '{tool_name}' is not registered"
482 raise ValueError(msg)
484 tool_func = _resolve_tool_callable(tools[tool_name], tool_name)
485 filtered_args = _build_tool_arguments(tool_func, provided_args)
487 if inspect.iscoroutinefunction(tool_func):
488 return await tool_func(**filtered_args)
489 return tool_func(**filtered_args)
492# Attach the method to the mcp instance as a bound method
493async def _call_tool_bound(
494 tool_name: str, arguments: dict[str, Any] | None = None
495) -> Any:
496 """Bound _call_tool method for the mcp instance."""
497 return await _call_registered_tool(mcp, tool_name, arguments)
500# CRITICAL: DO NOT override mcp._call_tool - it breaks FastMCP's internal middleware handling!
501# FastMCP's _call_tool expects MiddlewareContext, our function expects string tool_name
502# mcp._call_tool = _call_tool_bound # DISABLED - causes "Tool 'MiddlewareContext(...)' is not registered"
505async def reflect_on_past(
506 query: str,
507 limit: int = 5,
508 min_score: float = 0.7,
509 project: str | None = None,
510 optimize_tokens: bool = True,
511 max_tokens: int = 4000,
512) -> str:
513 """Search past conversations with optional token optimization."""
514 # Check if reflection tools are available
515 if not REFLECTION_TOOLS_AVAILABLE: 515 ↛ 516line 515 didn't jump to line 516 because the condition on line 515 was never true
516 return "❌ Reflection tools not available. Install dependencies: uv sync --extra embeddings"
518 # Initialize database
519 db = await _initialize_reflection_database()
520 if not db: 520 ↛ 521line 520 didn't jump to line 521 because the condition on line 520 was never true
521 return "❌ Reflection system not available. Install optional dependencies with `uv sync --extra embeddings`"
523 # Search conversations
524 results = await _search_conversations(db, query, project, limit, min_score)
525 if isinstance(results, str): # Error occurred 525 ↛ 526line 525 didn't jump to line 526 because the condition on line 525 was never true
526 return results
528 # Optimize tokens if requested
529 optimization_info = {}
530 if optimize_tokens and TOKEN_OPTIMIZER_AVAILABLE:
531 results, optimization_info = await _optimize_results(results, max_tokens)
533 # Format and return output
534 return _format_reflection_output(query, results, optimization_info)
537async def _initialize_reflection_database() -> Any | None:
538 """Initialize the reflection database with error handling."""
539 try:
540 return await get_reflection_database()
541 except Exception as exc: # pragma: no cover - defensive logging
542 _get_logger().exception(
543 "Failed to initialize reflection database",
544 exc_info=exc,
545 )
546 return None
549async def _search_conversations(
550 db: Any,
551 query: str,
552 project: str | None,
553 limit: int,
554 min_score: float,
555) -> Any | str:
556 """Search conversations and handle errors."""
557 try:
558 async with db:
559 return await db.search_conversations(
560 query=query,
561 project=project or current_project,
562 limit=limit,
563 min_score=min_score,
564 )
565 except Exception as exc:
566 _get_logger().exception("Reflection search failed", extra={"query": query})
567 return f"❌ Error searching conversations: {exc}"
570async def _optimize_results(
571 results: list[dict[str, Any]],
572 max_tokens: int,
573) -> tuple[list[dict[str, Any]], dict[str, Any]]:
574 """Optimize results with token optimization."""
575 try:
576 optimized_results, optimization_info = await optimize_search_response(
577 results,
578 strategy="prioritize_recent",
579 max_tokens=max_tokens,
580 )
581 if optimized_results:
582 results = optimized_results
584 token_savings = optimization_info.get("token_savings", {})
585 await track_token_usage(
586 operation="reflect_on_past",
587 request_tokens=max_tokens,
588 response_tokens=max_tokens - token_savings.get("tokens_saved", 0),
589 optimization_applied=optimization_info.get("strategy"),
590 )
591 return results, optimization_info
592 except Exception as exc:
593 _get_logger().warning(
594 "Token optimization failed for reflect_on_past",
595 extra={"error": str(exc)},
596 )
597 return results, {}
600def _format_reflection_output(
601 query: str,
602 results: list[dict[str, Any]],
603 optimization_info: dict[str, Any],
604) -> str:
605 """Format the reflection output."""
606 if not results:
607 return (
608 f"🔍 No relevant conversations found for query: '{query}'\n"
609 "💡 Try adjusting the search terms or lowering min_score."
610 )
612 output_lines = [
613 f"🔍 **Search Results for: '{query}'**",
614 "",
615 f"📊 Found {len(results)} relevant conversations",
616 "",
617 ]
619 token_savings = (
620 optimization_info.get("token_savings")
621 if isinstance(optimization_info, dict)
622 else None
623 )
624 if token_savings and token_savings.get("savings_percentage") is not None:
625 output_lines.extend(
626 (
627 f"⚡ Token optimization: {token_savings.get('savings_percentage')}% saved",
628 "",
629 )
630 )
632 output_lines.extend(_format_search_results(results))
633 return "\n".join(output_lines)
636# Wrapper for initialize_new_features that manages global state
637async def initialize_new_features() -> None:
638 """Initialize multi-project coordination and advanced search features (wrapper)."""
639 global multi_project_coordinator, advanced_search_engine, app_config
641 # Get the initialized instances from the implementation
642 (
643 multi_project_coordinator,
644 advanced_search_engine,
645 app_config,
646 ) = await _initialize_new_features_impl(
647 _get_logger(), # type: ignore[arg-type]
648 multi_project_coordinator,
649 advanced_search_engine,
650 app_config,
651 )
654# Phase 2.3: Import quality engine functions
655from session_buddy.quality_engine import (
656 calculate_quality_score as _calculate_quality_score_impl,
657)
660# Expose quality scoring function for external use
661async def calculate_quality_score(project_dir: Path | None = None) -> dict[str, Any]:
662 """Calculate session quality score using V2 algorithm.
664 This function provides a consistent interface for calculating quality scores
665 across the system.
667 Args:
668 project_dir: Path to the project directory. If not provided, will use current directory.
670 Returns:
671 Dict with quality score and breakdown information.
673 """
674 if project_dir is None:
675 project_dir = Path(os.environ.get("PWD", Path.cwd()))
677 return await _calculate_quality_score_impl(project_dir=project_dir)
680# Wrapper for health_check that provides required parameters
681async def health_check() -> dict[str, Any]:
682 """Comprehensive health check for MCP server and toolkit availability (wrapper)."""
683 return await _health_check_impl(
684 _get_logger(), # type: ignore[arg-type]
685 _get_permissions_manager(),
686 validate_claude_directory,
687 )
690# Phase 2.4: Import advanced feature tools from advanced_features module
691from session_buddy.advanced_features import (
692 add_project_dependency,
693 # Advanced Search Tools (3 MCP tools)
694 advanced_search,
695 cancel_user_reminder,
696 # Natural Language Scheduling Tools (5 MCP tools)
697 create_natural_reminder,
698 # Multi-Project Coordination Tools (4 MCP tools)
699 create_project_group,
700 # Interruption Management Tools (1 MCP tool)
701 get_interruption_statistics,
702 get_project_insights,
703 get_search_metrics,
704 # Git Worktree Management Tools (3 MCP tools)
705 git_worktree_add,
706 git_worktree_remove,
707 git_worktree_switch,
708 list_user_reminders,
709 search_across_projects,
710 search_suggestions,
711 # Session Welcome Tool (1 MCP tool)
712 session_welcome,
713 start_reminder_service,
714 stop_reminder_service,
715)
717# Register all 17 advanced MCP tools
718mcp.tool()(create_natural_reminder)
719mcp.tool()(list_user_reminders)
720mcp.tool()(cancel_user_reminder)
721mcp.tool()(start_reminder_service)
722mcp.tool()(stop_reminder_service)
723mcp.tool()(get_interruption_statistics)
724mcp.tool()(create_project_group)
725mcp.tool()(add_project_dependency)
726mcp.tool()(search_across_projects)
727mcp.tool()(get_project_insights)
728mcp.tool()(advanced_search)
729mcp.tool()(search_suggestions)
730mcp.tool()(get_search_metrics)
731mcp.tool()(git_worktree_add)
732mcp.tool()(git_worktree_remove)
733mcp.tool()(git_worktree_switch)
734mcp.tool()(session_welcome)
737def _perform_startup_validation() -> None:
738 """Perform startup validation checks (LLM API keys)."""
739 if not LLM_PROVIDERS_AVAILABLE:
740 return
742 try:
743 validate_llm_api_keys_at_startup()
744 except (ImportError, ValueError) as e:
745 _get_logger().warning(
746 f"LLM API key validation skipped (optional feature): {e}",
747 )
748 except Exception:
749 _get_logger().exception("Unexpected error during LLM validation")
752def _initialize_features() -> None:
753 """Initialize optional features on startup."""
754 try:
755 # Optionally run auto-migration when v2 is enabled via rollout flags
756 try:
757 flags = get_rollout_flags()
758 if flags.use_schema_v2 and _needs_migration():
759 _get_logger().info("Auto-migration: v1 detected, migrating to v2...")
760 res = _migrate_v1_to_v2()
761 if res.success:
762 _get_logger().info("Migration complete", extra={"stats": res.stats})
763 else:
764 _get_logger().warning(
765 "Migration failed; continuing with legacy schema",
766 extra={"error": res.error},
767 )
768 except Exception as e:
769 _get_logger().warning(f"Migration check error (optional): {e}")
771 asyncio.run(initialize_new_features())
772 except (ImportError, RuntimeError) as e:
773 _get_logger().warning(f"Feature initialization skipped (optional): {e}")
774 except Exception:
775 _get_logger().exception("Unexpected error during feature init")
778def _build_feature_list() -> list[str]:
779 """Build list of available features for display."""
780 features = [
781 "Session Lifecycle Management",
782 "Memory & Reflection System",
783 "Crackerjack Quality Integration",
784 "Knowledge Graph (DuckPGQ)",
785 "LLM Provider Management",
786 ]
787 if SECURITY_AVAILABLE: 787 ↛ 789line 787 didn't jump to line 789 because the condition on line 787 was always true
788 features.append("🔒 API Key Validation (OpenAI/Gemini)")
789 if RATE_LIMITING_AVAILABLE: 789 ↛ 791line 789 didn't jump to line 791 because the condition on line 789 was always true
790 features.append("⚡ Rate Limiting (10 req/sec, burst 30)")
791 return features
794def _display_http_startup(host: str, port: int, features: list[str]) -> None:
795 """Display HTTP mode startup information."""
796 if SERVERPANELS_AVAILABLE:
797 from mcp_common.ui import ServerPanels
799 ServerPanels.startup_success(
800 server_name="Session Management MCP",
801 version="2.0.0",
802 features=features,
803 endpoint=f"http://{host}:{port}/mcp",
804 websocket_monitor=str(_mcp_config.get("websocket_monitor_port", 8677)),
805 transport="HTTP (streamable)",
806 )
807 else:
808 # Fallback to simple print when ServerPanels not available
809 print("✅ Session Management MCP v2.0.0", file=sys.stderr)
810 print(f"🔗 Endpoint: http://{host}:{port}/mcp", file=sys.stderr)
811 print("📡 Transport: HTTP (streamable)", file=sys.stderr)
812 if features: 812 ↛ exitline 812 didn't return from function '_display_http_startup' because the condition on line 812 was always true
813 print(f"🎯 Features: {', '.join(features)}", file=sys.stderr)
816def _display_stdio_startup(features: list[str]) -> None:
817 """Display STDIO mode startup information."""
818 if SERVERPANELS_AVAILABLE:
819 from mcp_common.ui import ServerPanels
821 ServerPanels.startup_success(
822 server_name="Session Management MCP",
823 version="2.0.0",
824 features=features,
825 transport="STDIO",
826 mode="Claude Desktop",
827 )
828 else:
829 # Fallback to simple print when ServerPanels not available
830 print("✅ Session Management MCP v2.0.0", file=sys.stderr)
831 print("📡 Transport: STDIO (Claude Desktop)", file=sys.stderr)
832 if features: 832 ↛ exitline 832 didn't return from function '_display_stdio_startup' because the condition on line 832 was always true
833 print(f"🎯 Features: {', '.join(features)}", file=sys.stderr)
836def main(http_mode: bool = False, http_port: int | None = None) -> None:
837 """Main entry point for the MCP server."""
838 # Perform startup validation and initialization
839 _perform_startup_validation()
840 _initialize_features()
842 # Get configuration
843 host = _mcp_config.get("http_host", "127.0.0.1")
844 port = http_port or _mcp_config.get("http_port", 8678)
845 use_http = http_mode or _mcp_config.get("http_enabled", False)
847 # Build feature list for display
848 features = _build_feature_list()
850 # Display startup information and run server
851 if use_http:
852 _display_http_startup(host, port, features)
853 mcp.run(
854 transport="streamable-http",
855 host=host,
856 port=port,
857 path="/mcp",
858 stateless_http=True,
859 show_banner=False, # Disable Rich banner to avoid BlockingIOError
860 )
861 else:
862 _display_stdio_startup(features)
863 mcp.run(show_banner=False)
866def _ensure_default_recommendations(priority_actions: list[str]) -> list[str]:
867 """Ensure we always have default recommendations available."""
868 if not priority_actions:
869 return [
870 "Run quality checks with `crackerjack lint`",
871 "Check test coverage with `pytest --cov`",
872 "Review recent git commits for patterns",
873 ]
874 return priority_actions
877def _has_statistics_data(
878 sessions: list[dict[str, Any]],
879 interruptions: list[dict[str, Any]],
880 snapshots: list[dict[str, Any]],
881) -> bool:
882 """Check if we have any statistics data to display."""
883 return bool(sessions or interruptions or snapshots)
886def _parse_http_args(argv: list[str]) -> tuple[bool, int | None]:
887 """Parse HTTP mode and port from argv."""
888 http_mode = "--http" in argv
889 http_port = None
891 if "--http-port" in argv:
892 port_idx = argv.index("--http-port")
893 if port_idx + 1 < len(argv):
894 http_port = int(argv[port_idx + 1])
896 return http_mode, http_port
899# Export ASGI app for uvicorn (standardized startup pattern)
900http_app = mcp.http_app
903if __name__ == "__main__":
904 import sys
906 # Check for HTTP mode flags
907 http_mode, http_port = _parse_http_args(sys.argv)
908 main(http_mode, http_port)