Coverage for session_buddy / adapters / serverless_storage_adapter.py: 84.51%
110 statements
« prev ^ index » next coverage.py v7.13.1, created at 2026-01-04 00:43 -0800
« prev ^ index » next coverage.py v7.13.1, created at 2026-01-04 00:43 -0800
1"""Serverless storage adapter bridge for session state persistence.
3This module provides a bridge between the old SessionStorage protocol (used by
4serverless_mode.py) and the Oneiric SessionStorageAdapter. This enables
5serverless deployments to use Oneiric storage adapters while maintaining API
6compatibility.
8Architecture:
9 Old: serverless_mode.py → SessionStorage protocol → legacy backends
10 New: serverless_mode.py → ServerlessStorageAdapter → SessionStorageAdapter → Oneiric storage
12Example:
13 >>> from session_buddy.adapters import ServerlessStorageAdapter
14 >>> storage = ServerlessStorageAdapter(backend="file")
15 >>> await storage.store_session(session_state, ttl_seconds=3600)
16 True
18"""
20from __future__ import annotations
22import json
23import logging
24import typing as t
25from datetime import datetime, timedelta
27from session_buddy.backends.base import SessionState, SessionStorage
29if t.TYPE_CHECKING:
30 from session_buddy.adapters.session_storage_adapter import (
31 SessionStorageAdapter,
32 )
35class ServerlessStorageAdapter(SessionStorage):
36 """Bridge adapter implementing SessionStorage protocol using SessionStorageAdapter.
38 This adapter maintains backward compatibility with serverless_mode.py while
39 using the Oneiric SessionStorageAdapter underneath.
41 Attributes:
42 backend: Storage backend type (file, memory)
43 _storage: Internal SessionStorageAdapter instance
44 _session_metadata: Cache for session metadata (TTL tracking)
46 Example:
47 >>> adapter = ServerlessStorageAdapter(backend="s3")
48 >>> await adapter.store_session(session_state, ttl_seconds=3600)
49 >>> state = await adapter.retrieve_session("session_123")
51 """
53 def __init__(self, config: dict[str, t.Any] | None = None, backend: str = "file"):
54 """Initialize serverless storage adapter.
56 Args:
57 config: Legacy config dict (for compatibility, mostly ignored)
58 backend: Storage backend type (s3, file, azure, gcs, memory)
60 """
61 super().__init__(config or {})
62 self.backend = backend
63 self._storage: SessionStorageAdapter | None = None
64 self._session_metadata: dict[str, dict[str, t.Any]] = {}
65 self.logger = logging.getLogger(f"serverless.storage.{backend}")
67 async def _ensure_storage(self) -> SessionStorageAdapter:
68 """Ensure storage adapter is initialized.
70 Returns:
71 Initialized SessionStorageAdapter instance
73 """
74 if self._storage is None:
75 from session_buddy.adapters.session_storage_adapter import (
76 SessionStorageAdapter,
77 )
79 self._storage = SessionStorageAdapter(backend=self.backend)
81 return self._storage
83 async def store_session(
84 self,
85 session_state: SessionState,
86 ttl_seconds: int | None = None,
87 ) -> bool:
88 """Store session state with optional TTL.
90 Args:
91 session_state: Session state to store
92 ttl_seconds: Time-to-live in seconds (for expiration tracking)
94 Returns:
95 True if successful, False otherwise
97 Note:
98 TTL is stored as metadata for cleanup purposes. Actual expiration
99 depends on the storage backend's TTL support.
101 """
102 try:
103 storage = await self._ensure_storage()
105 # Convert SessionState to dict
106 state_dict = session_state.to_dict()
108 # Add TTL metadata if provided
109 if ttl_seconds:
110 expires_at = datetime.now() + timedelta(seconds=ttl_seconds)
111 state_dict["_ttl"] = {
112 "ttl_seconds": ttl_seconds,
113 "expires_at": expires_at.isoformat(),
114 }
116 # Store using SessionStorageAdapter
117 await storage.store_session(session_state.session_id, state_dict)
119 # Cache metadata for TTL tracking
120 self._session_metadata[session_state.session_id] = {
121 "user_id": session_state.user_id,
122 "project_id": session_state.project_id,
123 "created_at": session_state.created_at,
124 "ttl_seconds": ttl_seconds,
125 "expires_at": state_dict.get("_ttl", {}).get("expires_at"),
126 }
128 return True
130 except Exception as e:
131 self.logger.exception(
132 f"Failed to store session {session_state.session_id}: {e}"
133 )
134 return False
136 async def retrieve_session(self, session_id: str) -> SessionState | None:
137 """Retrieve session state by ID.
139 Args:
140 session_id: Unique session identifier
142 Returns:
143 SessionState if found, None otherwise
145 Note:
146 Checks TTL expiration before returning session.
148 """
149 try:
150 storage = await self._ensure_storage()
152 # Load from SessionStorageAdapter
153 state_dict = await storage.load_session(session_id)
155 if not state_dict:
156 return None
158 # Check TTL expiration
159 ttl_info = state_dict.get("_ttl", {})
160 if ttl_info and "expires_at" in ttl_info:
161 expires_at = datetime.fromisoformat(ttl_info["expires_at"])
162 if datetime.now() > expires_at:
163 # Session expired, delete it
164 await self.delete_session(session_id)
165 return None
167 # Remove TTL metadata before creating SessionState
168 state_dict.pop("_ttl", None)
169 state_dict.pop("_metadata", None) # Remove storage metadata
171 # Convert dict back to SessionState
172 return SessionState.from_dict(state_dict)
174 except Exception as e:
175 self.logger.exception(f"Failed to retrieve session {session_id}: {e}")
176 return None
178 async def delete_session(self, session_id: str) -> bool:
179 """Delete session state.
181 Args:
182 session_id: Unique session identifier
184 Returns:
185 True if deleted, False if not found or error
187 """
188 try:
189 storage = await self._ensure_storage()
191 # Delete from storage
192 success = await storage.delete_session(session_id)
194 # Remove from metadata cache
195 self._session_metadata.pop(session_id, None)
197 return success
199 except Exception as e:
200 self.logger.exception(f"Failed to delete session {session_id}: {e}")
201 return False
203 async def list_sessions(
204 self,
205 user_id: str | None = None,
206 project_id: str | None = None,
207 ) -> list[str]:
208 """List session IDs matching criteria.
210 Args:
211 user_id: Filter by user ID (optional)
212 project_id: Filter by project ID (optional)
214 Returns:
215 List of session IDs
217 Note:
218 Current implementation returns sessions from metadata cache.
219 Full implementation would require iterating through storage.
221 """
222 # Filter from metadata cache
223 matching_sessions = []
225 for session_id, metadata in self._session_metadata.items():
226 if user_id and metadata.get("user_id") != user_id:
227 continue
228 if project_id and metadata.get("project_id") != project_id: 228 ↛ 229line 228 didn't jump to line 229 because the condition on line 228 was never true
229 continue
231 # Check if expired
232 expires_at_str = metadata.get("expires_at")
233 if expires_at_str: 233 ↛ 234line 233 didn't jump to line 234 because the condition on line 233 was never true
234 expires_at = datetime.fromisoformat(expires_at_str)
235 if datetime.now() > expires_at:
236 continue
238 matching_sessions.append(session_id)
240 return matching_sessions
242 async def cleanup_expired_sessions(self) -> int:
243 """Clean up expired sessions.
245 Returns:
246 Number of sessions deleted
248 Note:
249 Iterates through metadata cache to find expired sessions.
251 """
252 deleted_count = 0
253 expired_sessions = []
255 # Find expired sessions
256 for session_id, metadata in self._session_metadata.items():
257 expires_at_str = metadata.get("expires_at")
258 if not expires_at_str: 258 ↛ 259line 258 didn't jump to line 259 because the condition on line 258 was never true
259 continue
261 try:
262 expires_at = datetime.fromisoformat(expires_at_str)
263 if datetime.now() > expires_at:
264 expired_sessions.append(session_id)
265 except ValueError:
266 # Invalid timestamp, skip
267 continue
269 # Delete expired sessions
270 for session_id in expired_sessions:
271 try:
272 success = await self.delete_session(session_id)
273 if success: 273 ↛ 270line 273 didn't jump to line 270 because the condition on line 273 was always true
274 deleted_count += 1
275 except Exception as e:
276 self.logger.warning(
277 f"Failed to delete expired session {session_id}: {e}"
278 )
280 if deleted_count > 0: 280 ↛ 283line 280 didn't jump to line 283 because the condition on line 280 was always true
281 self.logger.info(f"Cleaned up {deleted_count} expired sessions")
283 return deleted_count
285 async def is_available(self) -> bool:
286 """Check if storage backend is available.
288 Returns:
289 True if storage is accessible, False otherwise
291 """
292 try:
293 storage = await self._ensure_storage()
295 # Test storage with a ping operation
296 test_session_id = "_health_check_"
297 test_state = {"test": True}
299 # Try to store and retrieve
300 await storage.store_session(test_session_id, test_state)
301 result = await storage.load_session(test_session_id)
303 # Cleanup test session
304 await storage.delete_session(test_session_id)
306 return result is not None
308 except Exception as e:
309 self.logger.warning(f"Storage backend unavailable: {e}")
310 return False
313def create_serverless_storage(
314 backend: str = "file",
315 config: dict[str, t.Any] | None = None,
316) -> ServerlessStorageAdapter:
317 """Factory function to create serverless storage adapter.
319 Args:
320 backend: Storage backend type (file, memory)
321 config: Legacy config dict (for compatibility)
323 Returns:
324 Configured ServerlessStorageAdapter instance
326 Example:
327 >>> storage = create_serverless_storage("s3")
328 >>> await storage.store_session(session_state)
330 """
331 return ServerlessStorageAdapter(config=config, backend=backend)
334__all__ = [
335 "ServerlessStorageAdapter",
336 "create_serverless_storage",
337]