Coverage for session_buddy / tools / serverless_tools.py: 24.55%
139 statements
« prev ^ index » next coverage.py v7.13.1, created at 2026-01-04 00:43 -0800
« prev ^ index » next coverage.py v7.13.1, created at 2026-01-04 00:43 -0800
1#!/usr/bin/env python3
2"""Serverless session management MCP tools.
4This module provides tools for managing serverless sessions with external storage
5following crackerjack architecture patterns.
7Refactored to use utility modules for reduced code duplication.
8"""
10from __future__ import annotations
12import operator
13from typing import TYPE_CHECKING, Any
15from session_buddy.utils.error_handlers import _get_logger
16from session_buddy.utils.instance_managers import (
17 get_serverless_manager as resolve_serverless_manager,
18)
19from session_buddy.utils.messages import ToolMessages
21if TYPE_CHECKING:
22 from collections.abc import Awaitable, Callable
24 from fastmcp import FastMCP
27# ============================================================================
28# Service Resolution
29# ============================================================================
32async def _require_serverless_manager() -> Any:
33 """Get serverless manager instance or raise error."""
34 manager = await resolve_serverless_manager()
35 if manager is None:
36 msg = "Serverless mode not available. Install dependencies: pip install redis boto3"
37 raise RuntimeError(msg)
38 return manager
41async def _execute_serverless_operation(
42 operation_name: str, operation: Callable[[Any], Awaitable[str]]
43) -> str:
44 """Execute a serverless operation with error handling."""
45 try:
46 manager = await _require_serverless_manager()
47 return await operation(manager)
48 except RuntimeError as e:
49 return f"❌ {e!s}"
50 except Exception as e:
51 _get_logger().exception(f"Error in {operation_name}: {e}")
52 return ToolMessages.operation_failed(operation_name, e)
55# ============================================================================
56# Session Operations
57# ============================================================================
60async def _create_serverless_session_operation(
61 manager: Any,
62 user_id: str,
63 project_id: str,
64 session_data: dict[str, Any] | None,
65 ttl_hours: int,
66) -> str:
67 """Create a new serverless session with external storage."""
68 session_id = await manager.create_session(
69 user_id=user_id,
70 project_id=project_id,
71 session_data=session_data,
72 ttl_hours=ttl_hours,
73 )
74 return f"✅ Created serverless session: {session_id}\n🕐 TTL: {ttl_hours} hours"
77async def _create_serverless_session_impl(
78 user_id: str,
79 project_id: str,
80 session_data: dict[str, Any] | None = None,
81 ttl_hours: int = 24,
82) -> str:
83 """Create a new serverless session with external storage."""
85 async def operation_wrapper(manager: Any) -> str:
86 return await _create_serverless_session_operation(
87 manager, user_id, project_id, session_data, ttl_hours
88 )
90 return await _execute_serverless_operation(
91 "Create serverless session",
92 operation_wrapper,
93 )
96async def _get_serverless_session_operation(manager: Any, session_id: str) -> str:
97 """Get serverless session state."""
98 session = await manager.get_session(session_id)
100 if not session:
101 return f"❌ Session not found: {session_id}"
103 lines = [
104 f"📦 Serverless Session: {session_id}",
105 "",
106 f"👤 User ID: {session['user_id']}",
107 f"📁 Project ID: {session['project_id']}",
108 f"📅 Created: {session['created_at']}",
109 f"⏱️ Expires: {session['expires_at']}",
110 "",
111 "📊 Session Data:",
112 ]
114 data = session.get("session_data", {})
115 for key, value in data.items():
116 lines.append(f" • {key}: {value}")
118 return "\n".join(lines)
121async def _get_serverless_session_impl(session_id: str) -> str:
122 """Get serverless session state."""
124 async def operation_wrapper(manager: Any) -> str:
125 return await _get_serverless_session_operation(manager, session_id)
127 return await _execute_serverless_operation(
128 "Get serverless session",
129 operation_wrapper,
130 )
133async def _update_serverless_session_operation(
134 manager: Any,
135 session_id: str,
136 session_data: dict[str, Any],
137 extend_ttl_hours: int | None,
138) -> str:
139 """Update serverless session data."""
140 success = await manager.update_session(
141 session_id=session_id,
142 session_data=session_data,
143 extend_ttl_hours=extend_ttl_hours,
144 )
146 if not success:
147 return f"❌ Session not found: {session_id}"
149 lines = [f"✅ Updated session: {session_id}"]
150 if extend_ttl_hours:
151 lines.append(f"⏱️ Extended TTL by {extend_ttl_hours} hours")
152 return "\n".join(lines)
155async def _update_serverless_session_impl(
156 session_id: str,
157 session_data: dict[str, Any],
158 extend_ttl_hours: int | None = None,
159) -> str:
160 """Update serverless session data."""
162 async def operation_wrapper(manager: Any) -> str:
163 return await _update_serverless_session_operation(
164 manager, session_id, session_data, extend_ttl_hours
165 )
167 return await _execute_serverless_operation(
168 "Update serverless session",
169 operation_wrapper,
170 )
173async def _delete_serverless_session_operation(manager: Any, session_id: str) -> str:
174 """Delete a serverless session."""
175 success = await manager.delete_session(session_id)
177 if not success:
178 return f"❌ Session not found: {session_id}"
180 return f"✅ Deleted session: {session_id}"
183async def _delete_serverless_session_impl(session_id: str) -> str:
184 """Delete a serverless session."""
186 async def operation_wrapper(manager: Any) -> str:
187 return await _delete_serverless_session_operation(manager, session_id)
189 return await _execute_serverless_operation(
190 "Delete serverless session",
191 operation_wrapper,
192 )
195# ============================================================================
196# List and Cleanup Operations
197# ============================================================================
200async def _list_serverless_sessions_operation(
201 manager: Any,
202 user_id: str | None,
203 project_id: str | None,
204 include_expired: bool,
205) -> str:
206 """List serverless sessions with optional filtering."""
207 sessions = await manager.list_sessions(
208 user_id=user_id,
209 project_id=project_id,
210 include_expired=include_expired,
211 )
213 if not sessions:
214 filters = []
215 if user_id:
216 filters.append(f"user_id={user_id}")
217 if project_id:
218 filters.append(f"project_id={project_id}")
219 filter_str = f" ({', '.join(filters)})" if filters else ""
220 return f"🔍 No sessions found{filter_str}"
222 lines = [
223 f"📦 Found {len(sessions)} serverless session(s):",
224 "",
225 ]
227 for session in sessions:
228 lines.extend(
229 [
230 f"• Session ID: {session['session_id']}",
231 f" User: {session['user_id']}",
232 f" Project: {session['project_id']}",
233 f" Expires: {session['expires_at']}",
234 "",
235 ]
236 )
238 return "\n".join(lines)
241async def _list_serverless_sessions_impl(
242 user_id: str | None = None,
243 project_id: str | None = None,
244 include_expired: bool = False,
245) -> str:
246 """List serverless sessions with optional filtering."""
248 async def operation_wrapper(manager: Any) -> str:
249 return await _list_serverless_sessions_operation(
250 manager, user_id, project_id, include_expired
251 )
253 return await _execute_serverless_operation(
254 "List serverless sessions",
255 operation_wrapper,
256 )
259async def _cleanup_serverless_sessions_operation(manager: Any) -> str:
260 """Clean up expired serverless sessions."""
261 deleted_count = await manager.cleanup_expired_sessions()
262 return f"✅ Cleaned up {deleted_count} expired session(s)"
265async def _cleanup_serverless_sessions_impl() -> str:
266 """Clean up expired serverless sessions."""
267 return await _execute_serverless_operation(
268 "Cleanup serverless sessions",
269 _cleanup_serverless_sessions_operation,
270 )
273# ============================================================================
274# Storage Testing and Configuration
275# ============================================================================
278def _format_storage_test_results(results: dict[str, Any]) -> list[str]:
279 """Format storage backend test results."""
280 lines = [
281 "🧪 Storage Backend Test Results:",
282 "",
283 ]
285 for backend, result in results.items():
286 status = "✅" if result["available"] else "❌"
287 lines.append(f"{status} {backend.upper()}:")
289 if result["available"]:
290 lines.extend(
291 (
292 f" Latency: {result.get('latency_ms', 'N/A')} ms",
293 f" Status: {result.get('status', 'OK')}",
294 )
295 )
296 else:
297 lines.append(f" Error: {result.get('error', 'Unknown')}")
299 lines.append("")
301 return lines
304async def _test_serverless_storage_operation(manager: Any) -> str:
305 """Test all configured storage backends."""
306 results = await manager.test_storage_backends()
308 lines = _format_storage_test_results(results)
310 # Add recommendation
311 available = [name for name, res in results.items() if res["available"]]
312 if available:
313 fastest = min(
314 [
315 (name, res["latency_ms"])
316 for name, res in results.items()
317 if res["available"]
318 ],
319 key=operator.itemgetter(1),
320 )
321 lines.append(f"💡 Recommended: {fastest[0].upper()} (lowest latency)")
322 else:
323 lines.append("⚠️ No storage backends available")
325 return "\n".join(lines)
328async def _test_serverless_storage_impl() -> str:
329 """Test all configured storage backends."""
330 return await _execute_serverless_operation(
331 "Test serverless storage",
332 _test_serverless_storage_operation,
333 )
336async def _configure_serverless_storage_operation(
337 manager: Any,
338 backend: str,
339 config: dict[str, Any],
340) -> str:
341 """Configure storage backend for serverless sessions."""
342 success = await manager.configure_storage(backend=backend, config=config)
344 if not success:
345 return f"❌ Failed to configure {backend} storage"
347 return "\n".join(
348 [
349 f"✅ Configured {backend.upper()} storage backend",
350 "",
351 "⚙️ Configuration:",
352 *[f" • {key}: {value}" for key, value in config.items()],
353 ]
354 )
357async def _configure_serverless_storage_impl(
358 backend: str,
359 config: dict[str, Any],
360) -> str:
361 """Configure storage backend for serverless sessions."""
363 async def operation_wrapper(manager: Any) -> str:
364 return await _configure_serverless_storage_operation(manager, backend, config)
366 return await _execute_serverless_operation(
367 "Configure serverless storage",
368 operation_wrapper,
369 )
372# ============================================================================
373# MCP Tool Registration
374# ============================================================================
377def register_serverless_tools(mcp: FastMCP) -> None:
378 """Register all serverless session management tools."""
380 @mcp.tool() # type: ignore[misc]
381 async def create_serverless_session(
382 user_id: str,
383 project_id: str,
384 session_data: dict[str, Any] | None = None,
385 ttl_hours: int = 24,
386 ) -> str:
387 """Create a new serverless session with external storage."""
388 return await _create_serverless_session_impl(
389 user_id, project_id, session_data, ttl_hours
390 )
392 @mcp.tool() # type: ignore[misc]
393 async def get_serverless_session(session_id: str) -> str:
394 """Get serverless session state from external storage."""
395 return await _get_serverless_session_impl(session_id)
397 @mcp.tool() # type: ignore[misc]
398 async def update_serverless_session(
399 session_id: str,
400 session_data: dict[str, Any],
401 extend_ttl_hours: int | None = None,
402 ) -> str:
403 """Update serverless session data and optionally extend TTL."""
404 return await _update_serverless_session_impl(
405 session_id, session_data, extend_ttl_hours
406 )
408 @mcp.tool() # type: ignore[misc]
409 async def delete_serverless_session(session_id: str) -> str:
410 """Delete a serverless session from external storage."""
411 return await _delete_serverless_session_impl(session_id)
413 @mcp.tool() # type: ignore[misc]
414 async def list_serverless_sessions(
415 user_id: str | None = None,
416 project_id: str | None = None,
417 include_expired: bool = False,
418 ) -> str:
419 """List serverless sessions with optional filtering."""
420 return await _list_serverless_sessions_impl(
421 user_id, project_id, include_expired
422 )
424 @mcp.tool() # type: ignore[misc]
425 async def cleanup_serverless_sessions() -> str:
426 """Clean up expired serverless sessions from storage."""
427 return await _cleanup_serverless_sessions_impl()
429 @mcp.tool() # type: ignore[misc]
430 async def test_serverless_storage() -> str:
431 """Test all configured storage backends (Redis, S3, local)."""
432 return await _test_serverless_storage_impl()
434 @mcp.tool() # type: ignore[misc]
435 async def configure_serverless_storage(
436 backend: str,
437 config: dict[str, Any],
438 ) -> str:
439 """Configure storage backend for serverless sessions."""
440 return await _configure_serverless_storage_impl(backend, config)