Coverage for session_buddy / utils / database_helpers.py: 13.33%
65 statements
« prev ^ index » next coverage.py v7.13.1, created at 2026-01-04 00:43 -0800
« prev ^ index » next coverage.py v7.13.1, created at 2026-01-04 00:43 -0800
1#!/usr/bin/env python3
2"""Database resolution utilities for MCP tools.
4This module provides reusable database resolution and operation patterns to eliminate
5code duplication in tool implementations that depend on databases.
6"""
8from __future__ import annotations
10from typing import TYPE_CHECKING, Any, TypeVar
12from session_buddy.utils.error_handlers import DatabaseUnavailableError, _get_logger
13from session_buddy.utils.instance_managers import (
14 get_reflection_database as resolve_reflection_database,
15)
17if TYPE_CHECKING:
18 from collections.abc import Awaitable, Callable
20 from session_buddy.adapters.reflection_adapter import ReflectionDatabaseAdapter
22T = TypeVar("T")
25async def require_reflection_database() -> ReflectionDatabaseAdapter:
26 """Get reflection database or raise with helpful error.
28 This utility consolidates the common pattern of:
29 1. Resolving the database
30 2. Checking if it's None
31 3. Returning appropriate error message
33 Returns:
34 ReflectionDatabaseAdapter instance
36 Raises:
37 DatabaseUnavailableError: If database is not available
39 Example:
40 >>> db = await require_reflection_database()
41 >>> # Use db knowing it's not None
43 """
44 db = await resolve_reflection_database()
45 if not db:
46 msg = "Reflection database not available. Install dependencies: uv sync --extra embeddings"
47 raise DatabaseUnavailableError(msg)
48 return db
51async def safe_database_operation[T](
52 operation: Callable[[ReflectionDatabaseAdapter], Awaitable[T]],
53 error_message: str = "Database operation",
54) -> T:
55 """Execute database operation with automatic database resolution and error handling.
57 This utility wraps database operations to eliminate the repetitive pattern of:
58 1. Get database
59 2. Check if available
60 3. Execute operation
61 4. Handle errors
63 Args:
64 operation: Async function that takes database and returns result
65 error_message: Description of operation for error messages
67 Returns:
68 Result from the operation
70 Raises:
71 DatabaseUnavailableError: If database is not available
72 Exception: Any exception from the operation (will be logged)
74 Example:
75 >>> async def my_query(db):
76 ... return await db.search_reflections("test")
77 >>> results = await safe_database_operation(my_query, "Search reflections")
79 """
80 try:
81 db = await require_reflection_database()
82 return await operation(db)
83 except DatabaseUnavailableError:
84 # Re-raise database unavailable - caller should handle
85 raise
86 except Exception as e:
87 _get_logger().exception(f"Error in {error_message}: {e}")
88 raise
91async def safe_database_operation_with_message[T](
92 operation: Callable[[ReflectionDatabaseAdapter], Awaitable[T]],
93 error_message: str = "Database operation",
94) -> str:
95 """Execute database operation and return formatted string result.
97 Similar to safe_database_operation but catches all exceptions and returns
98 error messages as strings instead of raising.
100 Args:
101 operation: Async function that takes database and returns result
102 error_message: Description of operation for error messages
104 Returns:
105 String result from operation or error message
107 Example:
108 >>> async def my_query(db):
109 ... result = await db.search_reflections("test")
110 ... return f"Found {len(result)} results"
111 >>> message = await safe_database_operation_with_message(my_query)
112 >>> print(message)
114 """
115 try:
116 db = await require_reflection_database()
117 result = await operation(db)
118 # If operation returns a string, return it directly
119 if isinstance(result, str):
120 return result
121 # Otherwise, let caller handle the result
122 return str(result)
123 except DatabaseUnavailableError as e:
124 return f"❌ {e!s}"
125 except Exception as e:
126 _get_logger().exception(f"Error in {error_message}: {e}")
127 return f"❌ {error_message} failed: {e!s}"
130async def batch_database_operation(
131 items: list[T],
132 operation: Callable[[ReflectionDatabaseAdapter, T], Awaitable[Any]],
133 batch_size: int = 100,
134) -> list[Any]:
135 """Execute database operation in batches for better performance.
137 Useful for bulk operations that need to be chunked to avoid overwhelming
138 the database or memory.
140 Args:
141 items: List of items to process
142 operation: Async function that takes (database, item) and returns result
143 batch_size: Number of items to process per batch
145 Returns:
146 List of results in same order as input items
148 Raises:
149 DatabaseUnavailableError: If database is not available
151 Example:
152 >>> async def store_item(db, item):
153 ... return await db.store_reflection(item["content"], item["tags"])
154 >>> items = [{"content": "a", "tags": ["t1"]}, ...]
155 >>> results = await batch_database_operation(items, store_item)
157 """
158 db = await require_reflection_database()
160 results = []
161 for i in range(0, len(items), batch_size):
162 batch = items[i : i + batch_size]
163 batch_results = []
165 for item in batch:
166 try:
167 result = await operation(db, item)
168 batch_results.append(result)
169 except Exception as e:
170 _get_logger().exception(f"Error processing item {item}: {e}")
171 batch_results.append(None)
173 results.extend(batch_results)
175 return results
178def check_database_available() -> bool:
179 """Check if reflection database dependencies are available.
181 This is a synchronous check that can be used before attempting async operations.
183 Returns:
184 True if database is available, False otherwise
186 Example:
187 >>> if check_database_available():
188 ... result = await some_database_operation()
189 ... else:
190 ... print("Database not available")
192 """
193 try:
194 import importlib.util
196 spec = importlib.util.find_spec("session_buddy.reflection_tools")
197 if spec is None:
198 return False
200 # Check for required dependencies
201 spec = importlib.util.find_spec("duckdb")
202 return spec is not None
203 except ImportError:
204 return False
207async def get_database_stats() -> dict[str, Any]:
208 """Get statistics about database health and availability.
210 Returns:
211 Dictionary with database statistics
213 Example:
214 >>> stats = await get_database_stats()
215 >>> print(f"Total reflections: {stats['total_reflections']}")
217 """
218 try:
219 db = await require_reflection_database()
220 stats = await db.get_stats()
221 stats["available"] = True
222 return stats
223 except DatabaseUnavailableError:
224 return {
225 "available": False,
226 "error": "Database not available",
227 "total_reflections": 0,
228 "total_conversations": 0,
229 }
230 except Exception as e:
231 _get_logger().exception(f"Error getting database stats: {e}")
232 return {
233 "available": False,
234 "error": str(e),
235 "total_reflections": 0,
236 "total_conversations": 0,
237 }