Coverage for session_buddy / backends / s3_backend.py: 15.07%
128 statements
« prev ^ index » next coverage.py v7.13.1, created at 2026-01-04 00:43 -0800
« prev ^ index » next coverage.py v7.13.1, created at 2026-01-04 00:43 -0800
1"""S3-compatible session storage backend.
3**DEPRECATED**: This module is deprecated and will be removed in v1.0.
4Use ServerlessStorageAdapter with backend="s3" instead, which uses ACB's
5native S3 storage adapter for better performance and features.
7Migration:
8 Old: S3Storage(config)
9 New: ServerlessStorageAdapter(config, backend="s3")
11This module provides an S3-compatible implementation of the SessionStorage interface
12for storing and retrieving session state in S3-compatible object storage.
13"""
15from __future__ import annotations
17import asyncio
18import gzip
19import json
20import warnings
21from datetime import UTC, datetime, timedelta
22from typing import Any
24from session_buddy.backends.base import SessionState, SessionStorage
27class S3Storage(SessionStorage):
28 """S3-based session storage.
30 .. deprecated:: 0.9.3
31 S3Storage is deprecated. Use ``ServerlessStorageAdapter(backend="s3")``
32 which provides better performance, connection pooling, and streaming support
33 via ACB's native S3 storage adapter.
35 """
37 def __init__(self, config: dict[str, Any]) -> None:
38 warnings.warn(
39 "S3Storage is deprecated and will be removed in v1.0. "
40 "Use ServerlessStorageAdapter(backend='s3') instead for better "
41 "performance and ACB integration.",
42 DeprecationWarning,
43 stacklevel=2,
44 )
45 super().__init__(config)
46 self.bucket_name = config.get("bucket_name", "session-mgmt-mcp")
47 self.region = config.get("region", "us-east-1")
48 self.key_prefix = config.get("key_prefix", "sessions/")
49 self.access_key_id = config.get("access_key_id")
50 self.secret_access_key = config.get("secret_access_key")
51 self._s3_client = None
53 async def _get_s3_client(self) -> Any:
54 """Get or create S3 client."""
55 if self._s3_client is None:
56 try:
57 import boto3
58 from botocore.client import Config
60 session = boto3.Session(
61 aws_access_key_id=self.access_key_id,
62 aws_secret_access_key=self.secret_access_key,
63 region_name=self.region,
64 )
66 self._s3_client = session.client(
67 "s3",
68 config=Config(retries={"max_attempts": 3}, max_pool_connections=50),
69 )
70 except ImportError:
71 msg = "Boto3 package not installed. Install with: pip install boto3"
72 raise ImportError(
73 msg,
74 )
76 return self._s3_client
78 def _get_key(self, session_id: str) -> str:
79 """Get S3 key for session."""
80 return f"{self.key_prefix}{session_id}.json.gz"
82 async def store_session(
83 self,
84 session_state: SessionState,
85 ttl_seconds: int | None = None,
86 ) -> bool:
87 """Store session in S3."""
88 try:
89 s3_client = await self._get_s3_client()
91 # Serialize and compress session state
92 serialized = json.dumps(session_state.to_dict())
93 compressed = gzip.compress(serialized.encode("utf-8"))
95 # Prepare S3 object metadata
96 metadata = {
97 "user_id": session_state.user_id,
98 "project_id": session_state.project_id,
99 "created_at": session_state.created_at,
100 "last_activity": session_state.last_activity,
101 }
103 # Set expiration if TTL specified
104 expires = None
105 if ttl_seconds:
106 expires = datetime.now(UTC) + timedelta(seconds=ttl_seconds)
108 # Upload to S3
109 key = self._get_key(session_state.session_id)
111 put_args = {
112 "Bucket": self.bucket_name,
113 "Key": key,
114 "Body": compressed,
115 "ContentType": "application/json",
116 "ContentEncoding": "gzip",
117 "Metadata": metadata,
118 }
120 if expires:
121 put_args["Expires"] = expires
123 # Execute in thread pool since boto3 is synchronous
124 loop = asyncio.get_event_loop()
125 await loop.run_in_executor(None, lambda: s3_client.put_object(**put_args))
127 return True
129 except Exception as e:
130 self.logger.exception(
131 f"Failed to store session {session_state.session_id}: {e}",
132 )
133 return False
135 async def retrieve_session(self, session_id: str) -> SessionState | None:
136 """Retrieve session from S3."""
137 try:
138 s3_client = await self._get_s3_client()
139 key = self._get_key(session_id)
141 # Download from S3
142 loop = asyncio.get_event_loop()
143 response = await loop.run_in_executor(
144 None,
145 lambda: s3_client.get_object(Bucket=self.bucket_name, Key=key),
146 )
148 # Decompress and deserialize
149 compressed_data = response["Body"].read()
150 serialized = gzip.decompress(compressed_data).decode("utf-8")
151 session_data = json.loads(serialized)
153 return SessionState.from_dict(session_data)
155 except Exception as e:
156 self.logger.exception(f"Failed to retrieve session {session_id}: {e}")
157 return None
159 async def delete_session(self, session_id: str) -> bool:
160 """Delete session from S3."""
161 try:
162 s3_client = await self._get_s3_client()
163 key = self._get_key(session_id)
165 loop = asyncio.get_event_loop()
166 await loop.run_in_executor(
167 None,
168 lambda: s3_client.delete_object(Bucket=self.bucket_name, Key=key),
169 )
171 return True
173 except Exception as e:
174 self.logger.exception(f"Failed to delete session {session_id}: {e}")
175 return False
177 async def list_sessions(
178 self,
179 user_id: str | None = None,
180 project_id: str | None = None,
181 ) -> list[str]:
182 """List sessions in S3."""
183 try:
184 s3_client = await self._get_s3_client()
185 s3_objects = await self._get_s3_objects(s3_client)
187 session_ids = []
188 for obj in s3_objects:
189 key = obj["Key"]
190 session_id = self._extract_session_id_from_key(key)
192 if await self._should_include_s3_session(
193 s3_client,
194 key,
195 user_id,
196 project_id,
197 ):
198 session_ids.append(session_id)
200 return session_ids
202 except Exception as e:
203 self.logger.exception(f"Failed to list sessions: {e}")
204 return []
206 async def _get_s3_objects(self, s3_client: Any) -> list[dict[str, Any]]:
207 """Get S3 objects with the configured prefix."""
208 loop = asyncio.get_event_loop()
209 response = await loop.run_in_executor(
210 None,
211 lambda: s3_client.list_objects_v2(
212 Bucket=self.bucket_name,
213 Prefix=self.key_prefix,
214 ),
215 )
216 contents = response.get("Contents", [])
217 return list(contents) if contents else []
219 def _extract_session_id_from_key(self, key: str) -> str:
220 """Extract session ID from S3 object key."""
221 return key.replace(self.key_prefix, "").replace(".json.gz", "")
223 async def _should_include_s3_session(
224 self,
225 s3_client: Any,
226 key: str,
227 user_id: str | None,
228 project_id: str | None,
229 ) -> bool:
230 """Check if S3 session should be included based on filters."""
231 if not user_id and not project_id:
232 return True
234 metadata = await self._get_s3_object_metadata(s3_client, key)
236 if user_id and metadata.get("user_id") != user_id:
237 return False
238 return not (project_id and metadata.get("project_id") != project_id)
240 async def _get_s3_object_metadata(self, s3_client: Any, key: str) -> dict[str, Any]:
241 """Get metadata for an S3 object."""
242 loop = asyncio.get_event_loop()
243 head_response = await loop.run_in_executor(
244 None,
245 lambda: s3_client.head_object(Bucket=self.bucket_name, Key=key),
246 )
247 metadata = head_response.get("Metadata", {})
248 return dict(metadata) if metadata else {}
250 async def cleanup_expired_sessions(self) -> int:
251 """Clean up expired sessions from S3."""
252 try:
253 s3_client = await self._get_s3_client()
255 # S3 lifecycle policies handle expiration automatically
256 # This could implement custom logic for old sessions
258 now = datetime.now(UTC)
259 cleaned = 0
261 loop = asyncio.get_event_loop()
262 response = await loop.run_in_executor(
263 None,
264 lambda: s3_client.list_objects_v2(
265 Bucket=self.bucket_name,
266 Prefix=self.key_prefix,
267 ),
268 )
270 for obj in response.get("Contents", []):
271 # Check if object is expired (custom logic)
272 last_modified = obj["LastModified"].replace(tzinfo=None)
273 age_days = (now - last_modified).days
275 if age_days > 30: # Cleanup sessions older than 30 days
276 await loop.run_in_executor(
277 None,
278 lambda: s3_client.delete_object(
279 Bucket=self.bucket_name,
280 Key=obj["Key"],
281 ),
282 )
283 cleaned += 1
285 return cleaned
287 except Exception as e:
288 self.logger.exception(f"Failed to cleanup expired sessions: {e}")
289 return 0
291 async def is_available(self) -> bool:
292 """Check if S3 is available."""
293 try:
294 s3_client = await self._get_s3_client()
296 # Test bucket access
297 loop = asyncio.get_event_loop()
298 await loop.run_in_executor(
299 None,
300 lambda: s3_client.head_bucket(Bucket=self.bucket_name),
301 )
303 return True
304 except Exception:
305 return False