Coverage for session_buddy / adapters / session_storage_adapter.py: 96.12%

93 statements  

« prev     ^ index     » next       coverage.py v7.13.1, created at 2026-01-04 00:43 -0800

1"""Session storage adapter facade for unified session state persistence. 

2 

3This module provides a high-level interface for storing and retrieving session 

4state using Oneiric storage adapters as the backend. It abstracts away the 

5bucket-based API of the storage adapters and provides a session-centric interface. 

6 

7The adapter supports the Oneiric storage backends registered in the storage registry. 

8 

9Example: 

10 >>> from session_buddy.adapters import SessionStorageAdapter 

11 >>> storage = SessionStorageAdapter(backend="file") 

12 >>> await storage.store_session("session_123", {"status": "active"}) 

13 >>> state = await storage.load_session("session_123") 

14 >>> print(state) 

15 {'status': 'active'} 

16 

17""" 

18 

19from __future__ import annotations 

20 

21import json 

22import typing as t 

23from contextlib import suppress 

24from datetime import datetime 

25from pathlib import Path 

26 

27if t.TYPE_CHECKING: 

28 from session_buddy.adapters.storage_oneiric import StorageBaseOneiric 

29 

30# Default bucket for session storage 

31DEFAULT_SESSION_BUCKET = "sessions" 

32 

33 

34class SessionStorageAdapter: 

35 """Unified storage adapter for session state persistence. 

36 

37 This facade provides a simple, session-focused API on top of ACB storage 

38 adapters. It handles JSON serialization, path construction, and error 

39 handling automatically. 

40 

41 Attributes: 

42 backend: Storage backend type ("s3", "file", "azure", "gcs", "memory") 

43 bucket: Bucket name for session storage (default: "sessions") 

44 

45 Example: 

46 >>> storage = SessionStorageAdapter(backend="file") 

47 >>> await storage.store_session("abc123", {"status": "active"}) 

48 >>> state = await storage.load_session("abc123") 

49 

50 """ 

51 

52 def __init__(self, backend: str = "file", bucket: str = DEFAULT_SESSION_BUCKET): 

53 """Initialize session storage adapter. 

54 

55 Args: 

56 backend: Storage backend type (s3, file, azure, gcs, memory) 

57 bucket: Bucket name for session storage (default: "sessions") 

58 

59 Raises: 

60 ValueError: If backend is not supported 

61 

62 """ 

63 self.backend = backend 

64 self.bucket = bucket 

65 self._adapter: StorageBaseOneiric | None = None 

66 self._initialized = False 

67 

68 async def _ensure_adapter(self) -> StorageBaseOneiric: 

69 """Ensure storage adapter is initialized. 

70 

71 Returns: 

72 Initialized storage adapter instance 

73 

74 Raises: 

75 ValueError: If adapter not registered 

76 

77 """ 

78 if self._adapter is None: 

79 from session_buddy.adapters.storage_registry import get_storage_adapter 

80 

81 self._adapter = get_storage_adapter(self.backend) 

82 

83 if not self._initialized: 

84 # Initialize buckets on first use 

85 await self._adapter.init() 

86 self._initialized = True 

87 

88 return self._adapter 

89 

90 def _get_session_path(self, session_id: str, filename: str = "state.json") -> str: 

91 """Construct storage path for session file. 

92 

93 Args: 

94 session_id: Unique session identifier 

95 filename: Filename within session directory (default: state.json) 

96 

97 Returns: 

98 Storage path string in format: session_id/filename 

99 

100 Example: 

101 >>> adapter._get_session_path("abc123") 

102 'abc123/state.json' 

103 >>> adapter._get_session_path("abc123", "checkpoint_001.json") 

104 'abc123/checkpoint_001.json' 

105 

106 """ 

107 return f"{session_id}/{filename}" 

108 

109 async def store_session( 

110 self, 

111 session_id: str, 

112 state: dict[str, t.Any], 

113 filename: str = "state.json", 

114 ) -> None: 

115 """Store session state to storage backend. 

116 

117 Args: 

118 session_id: Unique session identifier 

119 state: Session state dictionary to store 

120 filename: Filename for the state file (default: state.json) 

121 

122 Raises: 

123 ValueError: If state is not serializable to JSON 

124 OSError: If storage operation fails 

125 

126 Example: 

127 >>> await storage.store_session("abc123", {"status": "active"}) 

128 >>> await storage.store_session( 

129 ... "abc123", checkpoint_data, "checkpoint_001.json" 

130 ... ) 

131 

132 """ 

133 adapter = await self._ensure_adapter() 

134 path = self._get_session_path(session_id, filename) 

135 

136 # Add metadata to state 

137 enhanced_state = state | { 

138 "_metadata": { 

139 "session_id": session_id, 

140 "stored_at": datetime.now().isoformat(), 

141 "backend": self.backend, 

142 }, 

143 } 

144 

145 # Serialize to JSON bytes 

146 try: 

147 data = json.dumps(enhanced_state, indent=2).encode("utf-8") 

148 except (TypeError, ValueError) as e: 

149 msg = f"Failed to serialize session state: {e}" 

150 raise ValueError(msg) from e 

151 

152 # Upload to storage 

153 await adapter.upload(self.bucket, path, data) 

154 

155 async def load_session( 

156 self, 

157 session_id: str, 

158 filename: str = "state.json", 

159 ) -> dict[str, t.Any] | None: 

160 """Load session state from storage backend. 

161 

162 Args: 

163 session_id: Unique session identifier 

164 filename: Filename to load (default: state.json) 

165 

166 Returns: 

167 Session state dictionary, or None if session not found 

168 

169 Raises: 

170 ValueError: If stored data is not valid JSON 

171 OSError: If storage read fails 

172 

173 Example: 

174 >>> state = await storage.load_session("abc123") 

175 >>> if state: 

176 ... print(state["status"]) 

177 

178 """ 

179 adapter = await self._ensure_adapter() 

180 path = self._get_session_path(session_id, filename) 

181 

182 # Check if session exists 

183 with suppress(Exception): 

184 # If exists() fails, try downloading anyway 

185 exists = await adapter.exists(self.bucket, path) 

186 if not exists: 

187 return None 

188 

189 # Download from storage 

190 try: 

191 file_obj = await adapter.download(self.bucket, path) 

192 # Read file content (file_obj is a BinaryIO or bytes) 

193 if isinstance(file_obj, (bytes, bytearray)): 

194 # file_obj is already bytes 

195 data = file_obj 

196 else: 

197 # file_obj is a file-like object 

198 data = file_obj.read() 

199 

200 # Decode and parse JSON 

201 result: dict[str, t.Any] = json.loads(data.decode("utf-8")) 

202 return result 

203 

204 except FileNotFoundError: 

205 return None 

206 except (json.JSONDecodeError, UnicodeDecodeError) as e: 

207 msg = f"Failed to parse session state: {e}" 

208 raise ValueError(msg) from e 

209 

210 async def delete_session( 

211 self, 

212 session_id: str, 

213 filename: str | None = None, 

214 ) -> bool: 

215 """Delete session state from storage backend. 

216 

217 Args: 

218 session_id: Unique session identifier 

219 filename: Specific file to delete, or None to delete all session files 

220 

221 Returns: 

222 True if deletion successful, False if session not found 

223 

224 Raises: 

225 OSError: If storage delete operation fails 

226 

227 Example: 

228 >>> await storage.delete_session("abc123") # Delete all files 

229 True 

230 >>> await storage.delete_session( 

231 ... "abc123", "checkpoint_001.json" 

232 ... ) # Delete specific 

233 True 

234 

235 """ 

236 adapter = await self._ensure_adapter() 

237 

238 if filename: 

239 # Delete specific file 

240 path = self._get_session_path(session_id, filename) 

241 try: 

242 await adapter.delete(self.bucket, path) 

243 return True 

244 except FileNotFoundError: 

245 return False 

246 else: 

247 # Delete all session files (requires listing directory) 

248 # For now, just delete state.json 

249 # TODO: Implement directory listing and bulk delete 

250 path = self._get_session_path(session_id, "state.json") 

251 try: 

252 await adapter.delete(self.bucket, path) 

253 return True 

254 except FileNotFoundError: 

255 return False 

256 

257 async def session_exists(self, session_id: str) -> bool: 

258 """Check if session exists in storage. 

259 

260 Args: 

261 session_id: Unique session identifier 

262 

263 Returns: 

264 True if session exists, False otherwise 

265 

266 Example: 

267 >>> if await storage.session_exists("abc123"): 

268 ... state = await storage.load_session("abc123") 

269 

270 """ 

271 adapter = await self._ensure_adapter() 

272 path = self._get_session_path(session_id) 

273 

274 try: 

275 exists: bool = await adapter.exists(self.bucket, path) 

276 return exists 

277 except Exception: 

278 # On any error, assume session doesn't exist 

279 return False 

280 

281 async def get_session_metadata( 

282 self, 

283 session_id: str, 

284 ) -> dict[str, t.Any] | None: 

285 """Get session metadata without loading full state. 

286 

287 Args: 

288 session_id: Unique session identifier 

289 

290 Returns: 

291 Metadata dictionary with size, timestamps, etc., or None if not found 

292 

293 Example: 

294 >>> metadata = await storage.get_session_metadata("abc123") 

295 >>> print(f"Session size: {metadata['size']} bytes") 

296 

297 """ 

298 adapter = await self._ensure_adapter() 

299 path = self._get_session_path(session_id) 

300 

301 try: 

302 stats = await adapter.stat(self.bucket, path) 

303 return { 

304 "session_id": session_id, 

305 "size": stats.get("size", 0), 

306 "modified": stats.get("updated") or stats.get("mtime"), 

307 "created": stats.get("timeCreated") or stats.get("created"), 

308 "backend": self.backend, 

309 } 

310 except FileNotFoundError: 

311 return None 

312 except Exception: 

313 # Return minimal metadata on error 

314 return { 

315 "session_id": session_id, 

316 "backend": self.backend, 

317 } 

318 

319 async def list_sessions(self) -> list[str]: 

320 """List all session IDs in storage. 

321 

322 Returns: 

323 List of session IDs 

324 

325 Note: 

326 This operation may be slow for large numbers of sessions. 

327 Consider implementing pagination for production use. 

328 

329 Example: 

330 >>> sessions = await storage.list_sessions() 

331 >>> print(f"Found {len(sessions)} sessions") 

332 

333 """ 

334 # TODO: Implement using adapter.stat() to list bucket contents 

335 # This requires iterating through bucket and extracting session IDs 

336 # For now, return empty list as this is not critical for Phase 1 

337 return [] 

338 

339 

340def get_default_storage_adapter() -> SessionStorageAdapter: 

341 """Get default session storage adapter from DI. 

342 

343 Returns: 

344 SessionStorageAdapter configured with default backend 

345 

346 Example: 

347 >>> storage = get_default_storage_adapter() 

348 >>> await storage.store_session("abc123", {"status": "active"}) 

349 

350 """ 

351 from session_buddy.adapters.settings import StorageAdapterSettings 

352 

353 settings = StorageAdapterSettings.from_settings() 

354 backend = settings.default_backend or "file" 

355 return SessionStorageAdapter(backend=backend) 

356 

357 

358__all__ = [ 

359 "DEFAULT_SESSION_BUCKET", 

360 "SessionStorageAdapter", 

361 "get_default_storage_adapter", 

362]