Coverage for session_buddy / tools / serverless_tools.py: 24.55%

139 statements  

« prev     ^ index     » next       coverage.py v7.13.1, created at 2026-01-04 00:43 -0800

1#!/usr/bin/env python3 

2"""Serverless session management MCP tools. 

3 

4This module provides tools for managing serverless sessions with external storage 

5following crackerjack architecture patterns. 

6 

7Refactored to use utility modules for reduced code duplication. 

8""" 

9 

10from __future__ import annotations 

11 

12import operator 

13from typing import TYPE_CHECKING, Any 

14 

15from session_buddy.utils.error_handlers import _get_logger 

16from session_buddy.utils.instance_managers import ( 

17 get_serverless_manager as resolve_serverless_manager, 

18) 

19from session_buddy.utils.messages import ToolMessages 

20 

21if TYPE_CHECKING: 

22 from collections.abc import Awaitable, Callable 

23 

24 from fastmcp import FastMCP 

25 

26 

27# ============================================================================ 

28# Service Resolution 

29# ============================================================================ 

30 

31 

32async def _require_serverless_manager() -> Any: 

33 """Get serverless manager instance or raise error.""" 

34 manager = await resolve_serverless_manager() 

35 if manager is None: 

36 msg = "Serverless mode not available. Install dependencies: pip install redis boto3" 

37 raise RuntimeError(msg) 

38 return manager 

39 

40 

41async def _execute_serverless_operation( 

42 operation_name: str, operation: Callable[[Any], Awaitable[str]] 

43) -> str: 

44 """Execute a serverless operation with error handling.""" 

45 try: 

46 manager = await _require_serverless_manager() 

47 return await operation(manager) 

48 except RuntimeError as e: 

49 return f"{e!s}" 

50 except Exception as e: 

51 _get_logger().exception(f"Error in {operation_name}: {e}") 

52 return ToolMessages.operation_failed(operation_name, e) 

53 

54 

55# ============================================================================ 

56# Session Operations 

57# ============================================================================ 

58 

59 

60async def _create_serverless_session_operation( 

61 manager: Any, 

62 user_id: str, 

63 project_id: str, 

64 session_data: dict[str, Any] | None, 

65 ttl_hours: int, 

66) -> str: 

67 """Create a new serverless session with external storage.""" 

68 session_id = await manager.create_session( 

69 user_id=user_id, 

70 project_id=project_id, 

71 session_data=session_data, 

72 ttl_hours=ttl_hours, 

73 ) 

74 return f"✅ Created serverless session: {session_id}\n🕐 TTL: {ttl_hours} hours" 

75 

76 

77async def _create_serverless_session_impl( 

78 user_id: str, 

79 project_id: str, 

80 session_data: dict[str, Any] | None = None, 

81 ttl_hours: int = 24, 

82) -> str: 

83 """Create a new serverless session with external storage.""" 

84 

85 async def operation_wrapper(manager: Any) -> str: 

86 return await _create_serverless_session_operation( 

87 manager, user_id, project_id, session_data, ttl_hours 

88 ) 

89 

90 return await _execute_serverless_operation( 

91 "Create serverless session", 

92 operation_wrapper, 

93 ) 

94 

95 

96async def _get_serverless_session_operation(manager: Any, session_id: str) -> str: 

97 """Get serverless session state.""" 

98 session = await manager.get_session(session_id) 

99 

100 if not session: 

101 return f"❌ Session not found: {session_id}" 

102 

103 lines = [ 

104 f"📦 Serverless Session: {session_id}", 

105 "", 

106 f"👤 User ID: {session['user_id']}", 

107 f"📁 Project ID: {session['project_id']}", 

108 f"📅 Created: {session['created_at']}", 

109 f"⏱️ Expires: {session['expires_at']}", 

110 "", 

111 "📊 Session Data:", 

112 ] 

113 

114 data = session.get("session_data", {}) 

115 for key, value in data.items(): 

116 lines.append(f"{key}: {value}") 

117 

118 return "\n".join(lines) 

119 

120 

121async def _get_serverless_session_impl(session_id: str) -> str: 

122 """Get serverless session state.""" 

123 

124 async def operation_wrapper(manager: Any) -> str: 

125 return await _get_serverless_session_operation(manager, session_id) 

126 

127 return await _execute_serverless_operation( 

128 "Get serverless session", 

129 operation_wrapper, 

130 ) 

131 

132 

133async def _update_serverless_session_operation( 

134 manager: Any, 

135 session_id: str, 

136 session_data: dict[str, Any], 

137 extend_ttl_hours: int | None, 

138) -> str: 

139 """Update serverless session data.""" 

140 success = await manager.update_session( 

141 session_id=session_id, 

142 session_data=session_data, 

143 extend_ttl_hours=extend_ttl_hours, 

144 ) 

145 

146 if not success: 

147 return f"❌ Session not found: {session_id}" 

148 

149 lines = [f"✅ Updated session: {session_id}"] 

150 if extend_ttl_hours: 

151 lines.append(f"⏱️ Extended TTL by {extend_ttl_hours} hours") 

152 return "\n".join(lines) 

153 

154 

155async def _update_serverless_session_impl( 

156 session_id: str, 

157 session_data: dict[str, Any], 

158 extend_ttl_hours: int | None = None, 

159) -> str: 

160 """Update serverless session data.""" 

161 

162 async def operation_wrapper(manager: Any) -> str: 

163 return await _update_serverless_session_operation( 

164 manager, session_id, session_data, extend_ttl_hours 

165 ) 

166 

167 return await _execute_serverless_operation( 

168 "Update serverless session", 

169 operation_wrapper, 

170 ) 

171 

172 

173async def _delete_serverless_session_operation(manager: Any, session_id: str) -> str: 

174 """Delete a serverless session.""" 

175 success = await manager.delete_session(session_id) 

176 

177 if not success: 

178 return f"❌ Session not found: {session_id}" 

179 

180 return f"✅ Deleted session: {session_id}" 

181 

182 

183async def _delete_serverless_session_impl(session_id: str) -> str: 

184 """Delete a serverless session.""" 

185 

186 async def operation_wrapper(manager: Any) -> str: 

187 return await _delete_serverless_session_operation(manager, session_id) 

188 

189 return await _execute_serverless_operation( 

190 "Delete serverless session", 

191 operation_wrapper, 

192 ) 

193 

194 

195# ============================================================================ 

196# List and Cleanup Operations 

197# ============================================================================ 

198 

199 

200async def _list_serverless_sessions_operation( 

201 manager: Any, 

202 user_id: str | None, 

203 project_id: str | None, 

204 include_expired: bool, 

205) -> str: 

206 """List serverless sessions with optional filtering.""" 

207 sessions = await manager.list_sessions( 

208 user_id=user_id, 

209 project_id=project_id, 

210 include_expired=include_expired, 

211 ) 

212 

213 if not sessions: 

214 filters = [] 

215 if user_id: 

216 filters.append(f"user_id={user_id}") 

217 if project_id: 

218 filters.append(f"project_id={project_id}") 

219 filter_str = f" ({', '.join(filters)})" if filters else "" 

220 return f"🔍 No sessions found{filter_str}" 

221 

222 lines = [ 

223 f"📦 Found {len(sessions)} serverless session(s):", 

224 "", 

225 ] 

226 

227 for session in sessions: 

228 lines.extend( 

229 [ 

230 f"• Session ID: {session['session_id']}", 

231 f" User: {session['user_id']}", 

232 f" Project: {session['project_id']}", 

233 f" Expires: {session['expires_at']}", 

234 "", 

235 ] 

236 ) 

237 

238 return "\n".join(lines) 

239 

240 

241async def _list_serverless_sessions_impl( 

242 user_id: str | None = None, 

243 project_id: str | None = None, 

244 include_expired: bool = False, 

245) -> str: 

246 """List serverless sessions with optional filtering.""" 

247 

248 async def operation_wrapper(manager: Any) -> str: 

249 return await _list_serverless_sessions_operation( 

250 manager, user_id, project_id, include_expired 

251 ) 

252 

253 return await _execute_serverless_operation( 

254 "List serverless sessions", 

255 operation_wrapper, 

256 ) 

257 

258 

259async def _cleanup_serverless_sessions_operation(manager: Any) -> str: 

260 """Clean up expired serverless sessions.""" 

261 deleted_count = await manager.cleanup_expired_sessions() 

262 return f"✅ Cleaned up {deleted_count} expired session(s)" 

263 

264 

265async def _cleanup_serverless_sessions_impl() -> str: 

266 """Clean up expired serverless sessions.""" 

267 return await _execute_serverless_operation( 

268 "Cleanup serverless sessions", 

269 _cleanup_serverless_sessions_operation, 

270 ) 

271 

272 

273# ============================================================================ 

274# Storage Testing and Configuration 

275# ============================================================================ 

276 

277 

278def _format_storage_test_results(results: dict[str, Any]) -> list[str]: 

279 """Format storage backend test results.""" 

280 lines = [ 

281 "🧪 Storage Backend Test Results:", 

282 "", 

283 ] 

284 

285 for backend, result in results.items(): 

286 status = "✅" if result["available"] else "❌" 

287 lines.append(f"{status} {backend.upper()}:") 

288 

289 if result["available"]: 

290 lines.extend( 

291 ( 

292 f" Latency: {result.get('latency_ms', 'N/A')} ms", 

293 f" Status: {result.get('status', 'OK')}", 

294 ) 

295 ) 

296 else: 

297 lines.append(f" Error: {result.get('error', 'Unknown')}") 

298 

299 lines.append("") 

300 

301 return lines 

302 

303 

304async def _test_serverless_storage_operation(manager: Any) -> str: 

305 """Test all configured storage backends.""" 

306 results = await manager.test_storage_backends() 

307 

308 lines = _format_storage_test_results(results) 

309 

310 # Add recommendation 

311 available = [name for name, res in results.items() if res["available"]] 

312 if available: 

313 fastest = min( 

314 [ 

315 (name, res["latency_ms"]) 

316 for name, res in results.items() 

317 if res["available"] 

318 ], 

319 key=operator.itemgetter(1), 

320 ) 

321 lines.append(f"💡 Recommended: {fastest[0].upper()} (lowest latency)") 

322 else: 

323 lines.append("⚠️ No storage backends available") 

324 

325 return "\n".join(lines) 

326 

327 

328async def _test_serverless_storage_impl() -> str: 

329 """Test all configured storage backends.""" 

330 return await _execute_serverless_operation( 

331 "Test serverless storage", 

332 _test_serverless_storage_operation, 

333 ) 

334 

335 

336async def _configure_serverless_storage_operation( 

337 manager: Any, 

338 backend: str, 

339 config: dict[str, Any], 

340) -> str: 

341 """Configure storage backend for serverless sessions.""" 

342 success = await manager.configure_storage(backend=backend, config=config) 

343 

344 if not success: 

345 return f"❌ Failed to configure {backend} storage" 

346 

347 return "\n".join( 

348 [ 

349 f"✅ Configured {backend.upper()} storage backend", 

350 "", 

351 "⚙️ Configuration:", 

352 *[f"{key}: {value}" for key, value in config.items()], 

353 ] 

354 ) 

355 

356 

357async def _configure_serverless_storage_impl( 

358 backend: str, 

359 config: dict[str, Any], 

360) -> str: 

361 """Configure storage backend for serverless sessions.""" 

362 

363 async def operation_wrapper(manager: Any) -> str: 

364 return await _configure_serverless_storage_operation(manager, backend, config) 

365 

366 return await _execute_serverless_operation( 

367 "Configure serverless storage", 

368 operation_wrapper, 

369 ) 

370 

371 

372# ============================================================================ 

373# MCP Tool Registration 

374# ============================================================================ 

375 

376 

377def register_serverless_tools(mcp: FastMCP) -> None: 

378 """Register all serverless session management tools.""" 

379 

380 @mcp.tool() # type: ignore[misc] 

381 async def create_serverless_session( 

382 user_id: str, 

383 project_id: str, 

384 session_data: dict[str, Any] | None = None, 

385 ttl_hours: int = 24, 

386 ) -> str: 

387 """Create a new serverless session with external storage.""" 

388 return await _create_serverless_session_impl( 

389 user_id, project_id, session_data, ttl_hours 

390 ) 

391 

392 @mcp.tool() # type: ignore[misc] 

393 async def get_serverless_session(session_id: str) -> str: 

394 """Get serverless session state from external storage.""" 

395 return await _get_serverless_session_impl(session_id) 

396 

397 @mcp.tool() # type: ignore[misc] 

398 async def update_serverless_session( 

399 session_id: str, 

400 session_data: dict[str, Any], 

401 extend_ttl_hours: int | None = None, 

402 ) -> str: 

403 """Update serverless session data and optionally extend TTL.""" 

404 return await _update_serverless_session_impl( 

405 session_id, session_data, extend_ttl_hours 

406 ) 

407 

408 @mcp.tool() # type: ignore[misc] 

409 async def delete_serverless_session(session_id: str) -> str: 

410 """Delete a serverless session from external storage.""" 

411 return await _delete_serverless_session_impl(session_id) 

412 

413 @mcp.tool() # type: ignore[misc] 

414 async def list_serverless_sessions( 

415 user_id: str | None = None, 

416 project_id: str | None = None, 

417 include_expired: bool = False, 

418 ) -> str: 

419 """List serverless sessions with optional filtering.""" 

420 return await _list_serverless_sessions_impl( 

421 user_id, project_id, include_expired 

422 ) 

423 

424 @mcp.tool() # type: ignore[misc] 

425 async def cleanup_serverless_sessions() -> str: 

426 """Clean up expired serverless sessions from storage.""" 

427 return await _cleanup_serverless_sessions_impl() 

428 

429 @mcp.tool() # type: ignore[misc] 

430 async def test_serverless_storage() -> str: 

431 """Test all configured storage backends (Redis, S3, local).""" 

432 return await _test_serverless_storage_impl() 

433 

434 @mcp.tool() # type: ignore[misc] 

435 async def configure_serverless_storage( 

436 backend: str, 

437 config: dict[str, Any], 

438 ) -> str: 

439 """Configure storage backend for serverless sessions.""" 

440 return await _configure_serverless_storage_impl(backend, config)