Coverage for session_buddy / serverless_mode.py: 70.69%
94 statements
« prev ^ index » next coverage.py v7.13.1, created at 2026-01-04 00:43 -0800
« prev ^ index » next coverage.py v7.13.1, created at 2026-01-04 00:43 -0800
1#!/usr/bin/env python3
2"""Stateless/Serverless Mode for Session Management MCP Server.
4Enables request-scoped sessions with Oneiric storage backends and keeps session
5state external to the request lifecycle.
6"""
8import hashlib
9import json
10import logging
11from datetime import datetime
12from pathlib import Path
13from typing import Any
15from session_buddy.adapters.serverless_storage_adapter import ServerlessStorageAdapter
16from session_buddy.backends import SessionState, SessionStorage
18CONFIG_LOGGER = logging.getLogger("serverless.config")
21class ServerlessSessionManager:
22 """Main session manager for serverless/stateless operation."""
24 def __init__(self, storage_backend: SessionStorage) -> None:
25 self.storage = storage_backend
26 self.logger = logging.getLogger("serverless.session_manager")
27 self.session_cache: dict[
28 str,
29 SessionState,
30 ] = {} # In-memory cache for current request
32 async def create_session(
33 self,
34 user_id: str,
35 project_id: str,
36 session_data: dict[str, Any] | None = None,
37 ttl_hours: int = 24,
38 ) -> str:
39 """Create new session."""
40 session_id = self._generate_session_id(user_id, project_id)
42 session_state = SessionState(
43 session_id=session_id,
44 user_id=user_id,
45 project_id=project_id,
46 created_at=datetime.now().isoformat(),
47 last_activity=datetime.now().isoformat(),
48 permissions=[],
49 conversation_history=[],
50 reflection_data={},
51 app_monitoring_state={},
52 llm_provider_configs={},
53 metadata=session_data or {},
54 )
56 # Store with TTL
57 ttl_seconds = ttl_hours * 3600
58 success = await self.storage.store_session(session_state, ttl_seconds)
60 if success: 60 ↛ 63line 60 didn't jump to line 63 because the condition on line 60 was always true
61 self.session_cache[session_id] = session_state
62 return session_id
63 msg = "Failed to create session"
64 raise RuntimeError(msg)
66 async def get_session(self, session_id: str) -> SessionState | None:
67 """Get session state."""
68 # Check cache first
69 if session_id in self.session_cache: 69 ↛ 73line 69 didn't jump to line 73 because the condition on line 69 was always true
70 return self.session_cache[session_id]
72 # Load from storage
73 session_state = await self.storage.retrieve_session(session_id)
74 if session_state:
75 self.session_cache[session_id] = session_state
77 return session_state
79 async def update_session(
80 self,
81 session_id: str,
82 updates: dict[str, Any],
83 ttl_hours: int | None = None,
84 ) -> bool:
85 """Update session state."""
86 session_state = await self.get_session(session_id)
87 if not session_state: 87 ↛ 88line 87 didn't jump to line 88 because the condition on line 87 was never true
88 return False
90 # Apply updates
91 for key, value in updates.items():
92 if hasattr(session_state, key): 92 ↛ 91line 92 didn't jump to line 91 because the condition on line 92 was always true
93 setattr(session_state, key, value)
95 # Update last activity
96 session_state.last_activity = datetime.now().isoformat()
98 # Store updated state
99 ttl_seconds = ttl_hours * 3600 if ttl_hours else None
100 success = await self.storage.store_session(session_state, ttl_seconds)
102 if success: 102 ↛ 105line 102 didn't jump to line 105 because the condition on line 102 was always true
103 self.session_cache[session_id] = session_state
105 return success
107 async def delete_session(self, session_id: str) -> bool:
108 """Delete session."""
109 # Remove from cache
110 self.session_cache.pop(session_id, None)
112 # Delete from storage
113 return await self.storage.delete_session(session_id)
115 async def list_user_sessions(self, user_id: str) -> list[str]:
116 """List sessions for user."""
117 return await self.storage.list_sessions(user_id=user_id)
119 async def list_project_sessions(self, project_id: str) -> list[str]:
120 """List sessions for project."""
121 return await self.storage.list_sessions(project_id=project_id)
123 async def cleanup_sessions(self) -> int:
124 """Clean up expired sessions."""
125 return await self.storage.cleanup_expired_sessions()
127 def _generate_session_id(self, user_id: str, project_id: str) -> str:
128 """Generate unique session ID."""
129 timestamp = datetime.now().isoformat()
130 data = f"{user_id}:{project_id}:{timestamp}"
131 return hashlib.sha256(data.encode()).hexdigest()[:16]
133 def get_session_stats(self) -> dict[str, Any]:
134 """Get session statistics."""
135 return {
136 "cached_sessions": len(self.session_cache),
137 "storage_backend": self.storage.__class__.__name__,
138 "storage_config": {
139 k: v for k, v in self.storage.config.items() if "key" not in k.lower()
140 },
141 }
144class ServerlessConfigManager:
145 """Manages configuration for serverless mode."""
147 @staticmethod
148 def load_config(config_path: str | None = None) -> dict[str, Any]:
149 """Load serverless configuration."""
150 default_config = {
151 "storage_backend": "file",
152 "session_ttl_hours": 24,
153 "cleanup_interval_hours": 6,
154 "backends": {
155 "file": {
156 "storage_dir": str(Path.home() / ".claude" / "data" / "sessions"),
157 },
158 "memory": {},
159 },
160 }
162 if config_path and Path(config_path).exists():
163 try:
164 with open(config_path, encoding="utf-8") as f:
165 file_config = json.load(f)
166 default_config.update(file_config)
167 except (OSError, json.JSONDecodeError):
168 pass
170 return default_config
172 @staticmethod
173 def create_storage_backend(config: dict[str, Any]) -> SessionStorage:
174 """Create storage backend from config.
176 Supports Oneiric storage adapters registered in the storage registry.
178 Recommended backends:
179 - "file": Local file storage (default, best for development)
180 - "memory": In-memory storage (testing only)
181 """
182 backend_type = config.get("storage_backend", "file") # Default to file
183 backend_config = config.get("backends", {}).get(backend_type, {})
185 if backend_type in ("file", "memory"):
186 CONFIG_LOGGER.info(
187 f"Using Oneiric storage adapter: {backend_type}",
188 )
189 return ServerlessStorageAdapter(config=backend_config, backend=backend_type)
191 msg = (
192 f"Unsupported storage backend: {backend_type}. "
193 "Supported backends: file, memory."
194 )
195 raise ValueError(msg)
197 @staticmethod
198 async def test_storage_backends(config: dict[str, Any]) -> dict[str, bool]:
199 """Test all configured storage backends."""
200 results: dict[str, bool] = {}
202 for backend_name, backend_config in config.get("backends", {}).items():
203 try:
204 storage: SessionStorage
205 match backend_name:
206 case "file" | "memory": 206 ↛ 210line 206 didn't jump to line 210 because the pattern on line 206 always matched
207 storage = ServerlessStorageAdapter(
208 config=backend_config, backend=backend_name
209 )
210 case _:
211 results[backend_name] = False
212 continue
214 results[backend_name] = await storage.is_available()
216 except Exception:
217 results[backend_name] = False
219 return results