Coverage for session_buddy / backends / s3_backend.py: 15.07%

128 statements  

« prev     ^ index     » next       coverage.py v7.13.1, created at 2026-01-04 00:43 -0800

1"""S3-compatible session storage backend. 

2 

3**DEPRECATED**: This module is deprecated and will be removed in v1.0. 

4Use ServerlessStorageAdapter with backend="s3" instead, which uses ACB's 

5native S3 storage adapter for better performance and features. 

6 

7Migration: 

8 Old: S3Storage(config) 

9 New: ServerlessStorageAdapter(config, backend="s3") 

10 

11This module provides an S3-compatible implementation of the SessionStorage interface 

12for storing and retrieving session state in S3-compatible object storage. 

13""" 

14 

15from __future__ import annotations 

16 

17import asyncio 

18import gzip 

19import json 

20import warnings 

21from datetime import UTC, datetime, timedelta 

22from typing import Any 

23 

24from session_buddy.backends.base import SessionState, SessionStorage 

25 

26 

27class S3Storage(SessionStorage): 

28 """S3-based session storage. 

29 

30 .. deprecated:: 0.9.3 

31 S3Storage is deprecated. Use ``ServerlessStorageAdapter(backend="s3")`` 

32 which provides better performance, connection pooling, and streaming support 

33 via ACB's native S3 storage adapter. 

34 

35 """ 

36 

37 def __init__(self, config: dict[str, Any]) -> None: 

38 warnings.warn( 

39 "S3Storage is deprecated and will be removed in v1.0. " 

40 "Use ServerlessStorageAdapter(backend='s3') instead for better " 

41 "performance and ACB integration.", 

42 DeprecationWarning, 

43 stacklevel=2, 

44 ) 

45 super().__init__(config) 

46 self.bucket_name = config.get("bucket_name", "session-mgmt-mcp") 

47 self.region = config.get("region", "us-east-1") 

48 self.key_prefix = config.get("key_prefix", "sessions/") 

49 self.access_key_id = config.get("access_key_id") 

50 self.secret_access_key = config.get("secret_access_key") 

51 self._s3_client = None 

52 

53 async def _get_s3_client(self) -> Any: 

54 """Get or create S3 client.""" 

55 if self._s3_client is None: 

56 try: 

57 import boto3 

58 from botocore.client import Config 

59 

60 session = boto3.Session( 

61 aws_access_key_id=self.access_key_id, 

62 aws_secret_access_key=self.secret_access_key, 

63 region_name=self.region, 

64 ) 

65 

66 self._s3_client = session.client( 

67 "s3", 

68 config=Config(retries={"max_attempts": 3}, max_pool_connections=50), 

69 ) 

70 except ImportError: 

71 msg = "Boto3 package not installed. Install with: pip install boto3" 

72 raise ImportError( 

73 msg, 

74 ) 

75 

76 return self._s3_client 

77 

78 def _get_key(self, session_id: str) -> str: 

79 """Get S3 key for session.""" 

80 return f"{self.key_prefix}{session_id}.json.gz" 

81 

82 async def store_session( 

83 self, 

84 session_state: SessionState, 

85 ttl_seconds: int | None = None, 

86 ) -> bool: 

87 """Store session in S3.""" 

88 try: 

89 s3_client = await self._get_s3_client() 

90 

91 # Serialize and compress session state 

92 serialized = json.dumps(session_state.to_dict()) 

93 compressed = gzip.compress(serialized.encode("utf-8")) 

94 

95 # Prepare S3 object metadata 

96 metadata = { 

97 "user_id": session_state.user_id, 

98 "project_id": session_state.project_id, 

99 "created_at": session_state.created_at, 

100 "last_activity": session_state.last_activity, 

101 } 

102 

103 # Set expiration if TTL specified 

104 expires = None 

105 if ttl_seconds: 

106 expires = datetime.now(UTC) + timedelta(seconds=ttl_seconds) 

107 

108 # Upload to S3 

109 key = self._get_key(session_state.session_id) 

110 

111 put_args = { 

112 "Bucket": self.bucket_name, 

113 "Key": key, 

114 "Body": compressed, 

115 "ContentType": "application/json", 

116 "ContentEncoding": "gzip", 

117 "Metadata": metadata, 

118 } 

119 

120 if expires: 

121 put_args["Expires"] = expires 

122 

123 # Execute in thread pool since boto3 is synchronous 

124 loop = asyncio.get_event_loop() 

125 await loop.run_in_executor(None, lambda: s3_client.put_object(**put_args)) 

126 

127 return True 

128 

129 except Exception as e: 

130 self.logger.exception( 

131 f"Failed to store session {session_state.session_id}: {e}", 

132 ) 

133 return False 

134 

135 async def retrieve_session(self, session_id: str) -> SessionState | None: 

136 """Retrieve session from S3.""" 

137 try: 

138 s3_client = await self._get_s3_client() 

139 key = self._get_key(session_id) 

140 

141 # Download from S3 

142 loop = asyncio.get_event_loop() 

143 response = await loop.run_in_executor( 

144 None, 

145 lambda: s3_client.get_object(Bucket=self.bucket_name, Key=key), 

146 ) 

147 

148 # Decompress and deserialize 

149 compressed_data = response["Body"].read() 

150 serialized = gzip.decompress(compressed_data).decode("utf-8") 

151 session_data = json.loads(serialized) 

152 

153 return SessionState.from_dict(session_data) 

154 

155 except Exception as e: 

156 self.logger.exception(f"Failed to retrieve session {session_id}: {e}") 

157 return None 

158 

159 async def delete_session(self, session_id: str) -> bool: 

160 """Delete session from S3.""" 

161 try: 

162 s3_client = await self._get_s3_client() 

163 key = self._get_key(session_id) 

164 

165 loop = asyncio.get_event_loop() 

166 await loop.run_in_executor( 

167 None, 

168 lambda: s3_client.delete_object(Bucket=self.bucket_name, Key=key), 

169 ) 

170 

171 return True 

172 

173 except Exception as e: 

174 self.logger.exception(f"Failed to delete session {session_id}: {e}") 

175 return False 

176 

177 async def list_sessions( 

178 self, 

179 user_id: str | None = None, 

180 project_id: str | None = None, 

181 ) -> list[str]: 

182 """List sessions in S3.""" 

183 try: 

184 s3_client = await self._get_s3_client() 

185 s3_objects = await self._get_s3_objects(s3_client) 

186 

187 session_ids = [] 

188 for obj in s3_objects: 

189 key = obj["Key"] 

190 session_id = self._extract_session_id_from_key(key) 

191 

192 if await self._should_include_s3_session( 

193 s3_client, 

194 key, 

195 user_id, 

196 project_id, 

197 ): 

198 session_ids.append(session_id) 

199 

200 return session_ids 

201 

202 except Exception as e: 

203 self.logger.exception(f"Failed to list sessions: {e}") 

204 return [] 

205 

206 async def _get_s3_objects(self, s3_client: Any) -> list[dict[str, Any]]: 

207 """Get S3 objects with the configured prefix.""" 

208 loop = asyncio.get_event_loop() 

209 response = await loop.run_in_executor( 

210 None, 

211 lambda: s3_client.list_objects_v2( 

212 Bucket=self.bucket_name, 

213 Prefix=self.key_prefix, 

214 ), 

215 ) 

216 contents = response.get("Contents", []) 

217 return list(contents) if contents else [] 

218 

219 def _extract_session_id_from_key(self, key: str) -> str: 

220 """Extract session ID from S3 object key.""" 

221 return key.replace(self.key_prefix, "").replace(".json.gz", "") 

222 

223 async def _should_include_s3_session( 

224 self, 

225 s3_client: Any, 

226 key: str, 

227 user_id: str | None, 

228 project_id: str | None, 

229 ) -> bool: 

230 """Check if S3 session should be included based on filters.""" 

231 if not user_id and not project_id: 

232 return True 

233 

234 metadata = await self._get_s3_object_metadata(s3_client, key) 

235 

236 if user_id and metadata.get("user_id") != user_id: 

237 return False 

238 return not (project_id and metadata.get("project_id") != project_id) 

239 

240 async def _get_s3_object_metadata(self, s3_client: Any, key: str) -> dict[str, Any]: 

241 """Get metadata for an S3 object.""" 

242 loop = asyncio.get_event_loop() 

243 head_response = await loop.run_in_executor( 

244 None, 

245 lambda: s3_client.head_object(Bucket=self.bucket_name, Key=key), 

246 ) 

247 metadata = head_response.get("Metadata", {}) 

248 return dict(metadata) if metadata else {} 

249 

250 async def cleanup_expired_sessions(self) -> int: 

251 """Clean up expired sessions from S3.""" 

252 try: 

253 s3_client = await self._get_s3_client() 

254 

255 # S3 lifecycle policies handle expiration automatically 

256 # This could implement custom logic for old sessions 

257 

258 now = datetime.now(UTC) 

259 cleaned = 0 

260 

261 loop = asyncio.get_event_loop() 

262 response = await loop.run_in_executor( 

263 None, 

264 lambda: s3_client.list_objects_v2( 

265 Bucket=self.bucket_name, 

266 Prefix=self.key_prefix, 

267 ), 

268 ) 

269 

270 for obj in response.get("Contents", []): 

271 # Check if object is expired (custom logic) 

272 last_modified = obj["LastModified"].replace(tzinfo=None) 

273 age_days = (now - last_modified).days 

274 

275 if age_days > 30: # Cleanup sessions older than 30 days 

276 await loop.run_in_executor( 

277 None, 

278 lambda: s3_client.delete_object( 

279 Bucket=self.bucket_name, 

280 Key=obj["Key"], 

281 ), 

282 ) 

283 cleaned += 1 

284 

285 return cleaned 

286 

287 except Exception as e: 

288 self.logger.exception(f"Failed to cleanup expired sessions: {e}") 

289 return 0 

290 

291 async def is_available(self) -> bool: 

292 """Check if S3 is available.""" 

293 try: 

294 s3_client = await self._get_s3_client() 

295 

296 # Test bucket access 

297 loop = asyncio.get_event_loop() 

298 await loop.run_in_executor( 

299 None, 

300 lambda: s3_client.head_bucket(Bucket=self.bucket_name), 

301 ) 

302 

303 return True 

304 except Exception: 

305 return False