Coverage for session_buddy / adapters / serverless_storage_adapter.py: 84.51%

110 statements  

« prev     ^ index     » next       coverage.py v7.13.1, created at 2026-01-04 00:43 -0800

1"""Serverless storage adapter bridge for session state persistence. 

2 

3This module provides a bridge between the old SessionStorage protocol (used by 

4serverless_mode.py) and the Oneiric SessionStorageAdapter. This enables 

5serverless deployments to use Oneiric storage adapters while maintaining API 

6compatibility. 

7 

8Architecture: 

9 Old: serverless_mode.py → SessionStorage protocol → legacy backends 

10 New: serverless_mode.py → ServerlessStorageAdapter → SessionStorageAdapter → Oneiric storage 

11 

12Example: 

13 >>> from session_buddy.adapters import ServerlessStorageAdapter 

14 >>> storage = ServerlessStorageAdapter(backend="file") 

15 >>> await storage.store_session(session_state, ttl_seconds=3600) 

16 True 

17 

18""" 

19 

20from __future__ import annotations 

21 

22import json 

23import logging 

24import typing as t 

25from datetime import datetime, timedelta 

26 

27from session_buddy.backends.base import SessionState, SessionStorage 

28 

29if t.TYPE_CHECKING: 

30 from session_buddy.adapters.session_storage_adapter import ( 

31 SessionStorageAdapter, 

32 ) 

33 

34 

35class ServerlessStorageAdapter(SessionStorage): 

36 """Bridge adapter implementing SessionStorage protocol using SessionStorageAdapter. 

37 

38 This adapter maintains backward compatibility with serverless_mode.py while 

39 using the Oneiric SessionStorageAdapter underneath. 

40 

41 Attributes: 

42 backend: Storage backend type (file, memory) 

43 _storage: Internal SessionStorageAdapter instance 

44 _session_metadata: Cache for session metadata (TTL tracking) 

45 

46 Example: 

47 >>> adapter = ServerlessStorageAdapter(backend="s3") 

48 >>> await adapter.store_session(session_state, ttl_seconds=3600) 

49 >>> state = await adapter.retrieve_session("session_123") 

50 

51 """ 

52 

53 def __init__(self, config: dict[str, t.Any] | None = None, backend: str = "file"): 

54 """Initialize serverless storage adapter. 

55 

56 Args: 

57 config: Legacy config dict (for compatibility, mostly ignored) 

58 backend: Storage backend type (s3, file, azure, gcs, memory) 

59 

60 """ 

61 super().__init__(config or {}) 

62 self.backend = backend 

63 self._storage: SessionStorageAdapter | None = None 

64 self._session_metadata: dict[str, dict[str, t.Any]] = {} 

65 self.logger = logging.getLogger(f"serverless.storage.{backend}") 

66 

67 async def _ensure_storage(self) -> SessionStorageAdapter: 

68 """Ensure storage adapter is initialized. 

69 

70 Returns: 

71 Initialized SessionStorageAdapter instance 

72 

73 """ 

74 if self._storage is None: 

75 from session_buddy.adapters.session_storage_adapter import ( 

76 SessionStorageAdapter, 

77 ) 

78 

79 self._storage = SessionStorageAdapter(backend=self.backend) 

80 

81 return self._storage 

82 

83 async def store_session( 

84 self, 

85 session_state: SessionState, 

86 ttl_seconds: int | None = None, 

87 ) -> bool: 

88 """Store session state with optional TTL. 

89 

90 Args: 

91 session_state: Session state to store 

92 ttl_seconds: Time-to-live in seconds (for expiration tracking) 

93 

94 Returns: 

95 True if successful, False otherwise 

96 

97 Note: 

98 TTL is stored as metadata for cleanup purposes. Actual expiration 

99 depends on the storage backend's TTL support. 

100 

101 """ 

102 try: 

103 storage = await self._ensure_storage() 

104 

105 # Convert SessionState to dict 

106 state_dict = session_state.to_dict() 

107 

108 # Add TTL metadata if provided 

109 if ttl_seconds: 

110 expires_at = datetime.now() + timedelta(seconds=ttl_seconds) 

111 state_dict["_ttl"] = { 

112 "ttl_seconds": ttl_seconds, 

113 "expires_at": expires_at.isoformat(), 

114 } 

115 

116 # Store using SessionStorageAdapter 

117 await storage.store_session(session_state.session_id, state_dict) 

118 

119 # Cache metadata for TTL tracking 

120 self._session_metadata[session_state.session_id] = { 

121 "user_id": session_state.user_id, 

122 "project_id": session_state.project_id, 

123 "created_at": session_state.created_at, 

124 "ttl_seconds": ttl_seconds, 

125 "expires_at": state_dict.get("_ttl", {}).get("expires_at"), 

126 } 

127 

128 return True 

129 

130 except Exception as e: 

131 self.logger.exception( 

132 f"Failed to store session {session_state.session_id}: {e}" 

133 ) 

134 return False 

135 

136 async def retrieve_session(self, session_id: str) -> SessionState | None: 

137 """Retrieve session state by ID. 

138 

139 Args: 

140 session_id: Unique session identifier 

141 

142 Returns: 

143 SessionState if found, None otherwise 

144 

145 Note: 

146 Checks TTL expiration before returning session. 

147 

148 """ 

149 try: 

150 storage = await self._ensure_storage() 

151 

152 # Load from SessionStorageAdapter 

153 state_dict = await storage.load_session(session_id) 

154 

155 if not state_dict: 

156 return None 

157 

158 # Check TTL expiration 

159 ttl_info = state_dict.get("_ttl", {}) 

160 if ttl_info and "expires_at" in ttl_info: 

161 expires_at = datetime.fromisoformat(ttl_info["expires_at"]) 

162 if datetime.now() > expires_at: 

163 # Session expired, delete it 

164 await self.delete_session(session_id) 

165 return None 

166 

167 # Remove TTL metadata before creating SessionState 

168 state_dict.pop("_ttl", None) 

169 state_dict.pop("_metadata", None) # Remove storage metadata 

170 

171 # Convert dict back to SessionState 

172 return SessionState.from_dict(state_dict) 

173 

174 except Exception as e: 

175 self.logger.exception(f"Failed to retrieve session {session_id}: {e}") 

176 return None 

177 

178 async def delete_session(self, session_id: str) -> bool: 

179 """Delete session state. 

180 

181 Args: 

182 session_id: Unique session identifier 

183 

184 Returns: 

185 True if deleted, False if not found or error 

186 

187 """ 

188 try: 

189 storage = await self._ensure_storage() 

190 

191 # Delete from storage 

192 success = await storage.delete_session(session_id) 

193 

194 # Remove from metadata cache 

195 self._session_metadata.pop(session_id, None) 

196 

197 return success 

198 

199 except Exception as e: 

200 self.logger.exception(f"Failed to delete session {session_id}: {e}") 

201 return False 

202 

203 async def list_sessions( 

204 self, 

205 user_id: str | None = None, 

206 project_id: str | None = None, 

207 ) -> list[str]: 

208 """List session IDs matching criteria. 

209 

210 Args: 

211 user_id: Filter by user ID (optional) 

212 project_id: Filter by project ID (optional) 

213 

214 Returns: 

215 List of session IDs 

216 

217 Note: 

218 Current implementation returns sessions from metadata cache. 

219 Full implementation would require iterating through storage. 

220 

221 """ 

222 # Filter from metadata cache 

223 matching_sessions = [] 

224 

225 for session_id, metadata in self._session_metadata.items(): 

226 if user_id and metadata.get("user_id") != user_id: 

227 continue 

228 if project_id and metadata.get("project_id") != project_id: 228 ↛ 229line 228 didn't jump to line 229 because the condition on line 228 was never true

229 continue 

230 

231 # Check if expired 

232 expires_at_str = metadata.get("expires_at") 

233 if expires_at_str: 233 ↛ 234line 233 didn't jump to line 234 because the condition on line 233 was never true

234 expires_at = datetime.fromisoformat(expires_at_str) 

235 if datetime.now() > expires_at: 

236 continue 

237 

238 matching_sessions.append(session_id) 

239 

240 return matching_sessions 

241 

242 async def cleanup_expired_sessions(self) -> int: 

243 """Clean up expired sessions. 

244 

245 Returns: 

246 Number of sessions deleted 

247 

248 Note: 

249 Iterates through metadata cache to find expired sessions. 

250 

251 """ 

252 deleted_count = 0 

253 expired_sessions = [] 

254 

255 # Find expired sessions 

256 for session_id, metadata in self._session_metadata.items(): 

257 expires_at_str = metadata.get("expires_at") 

258 if not expires_at_str: 258 ↛ 259line 258 didn't jump to line 259 because the condition on line 258 was never true

259 continue 

260 

261 try: 

262 expires_at = datetime.fromisoformat(expires_at_str) 

263 if datetime.now() > expires_at: 

264 expired_sessions.append(session_id) 

265 except ValueError: 

266 # Invalid timestamp, skip 

267 continue 

268 

269 # Delete expired sessions 

270 for session_id in expired_sessions: 

271 try: 

272 success = await self.delete_session(session_id) 

273 if success: 273 ↛ 270line 273 didn't jump to line 270 because the condition on line 273 was always true

274 deleted_count += 1 

275 except Exception as e: 

276 self.logger.warning( 

277 f"Failed to delete expired session {session_id}: {e}" 

278 ) 

279 

280 if deleted_count > 0: 280 ↛ 283line 280 didn't jump to line 283 because the condition on line 280 was always true

281 self.logger.info(f"Cleaned up {deleted_count} expired sessions") 

282 

283 return deleted_count 

284 

285 async def is_available(self) -> bool: 

286 """Check if storage backend is available. 

287 

288 Returns: 

289 True if storage is accessible, False otherwise 

290 

291 """ 

292 try: 

293 storage = await self._ensure_storage() 

294 

295 # Test storage with a ping operation 

296 test_session_id = "_health_check_" 

297 test_state = {"test": True} 

298 

299 # Try to store and retrieve 

300 await storage.store_session(test_session_id, test_state) 

301 result = await storage.load_session(test_session_id) 

302 

303 # Cleanup test session 

304 await storage.delete_session(test_session_id) 

305 

306 return result is not None 

307 

308 except Exception as e: 

309 self.logger.warning(f"Storage backend unavailable: {e}") 

310 return False 

311 

312 

313def create_serverless_storage( 

314 backend: str = "file", 

315 config: dict[str, t.Any] | None = None, 

316) -> ServerlessStorageAdapter: 

317 """Factory function to create serverless storage adapter. 

318 

319 Args: 

320 backend: Storage backend type (file, memory) 

321 config: Legacy config dict (for compatibility) 

322 

323 Returns: 

324 Configured ServerlessStorageAdapter instance 

325 

326 Example: 

327 >>> storage = create_serverless_storage("s3") 

328 >>> await storage.store_session(session_state) 

329 

330 """ 

331 return ServerlessStorageAdapter(config=config, backend=backend) 

332 

333 

334__all__ = [ 

335 "ServerlessStorageAdapter", 

336 "create_serverless_storage", 

337]