Coverage for session_buddy / utils / database_helpers.py: 13.33%

65 statements  

« prev     ^ index     » next       coverage.py v7.13.1, created at 2026-01-04 00:43 -0800

1#!/usr/bin/env python3 

2"""Database resolution utilities for MCP tools. 

3 

4This module provides reusable database resolution and operation patterns to eliminate 

5code duplication in tool implementations that depend on databases. 

6""" 

7 

8from __future__ import annotations 

9 

10from typing import TYPE_CHECKING, Any, TypeVar 

11 

12from session_buddy.utils.error_handlers import DatabaseUnavailableError, _get_logger 

13from session_buddy.utils.instance_managers import ( 

14 get_reflection_database as resolve_reflection_database, 

15) 

16 

17if TYPE_CHECKING: 

18 from collections.abc import Awaitable, Callable 

19 

20 from session_buddy.adapters.reflection_adapter import ReflectionDatabaseAdapter 

21 

22T = TypeVar("T") 

23 

24 

25async def require_reflection_database() -> ReflectionDatabaseAdapter: 

26 """Get reflection database or raise with helpful error. 

27 

28 This utility consolidates the common pattern of: 

29 1. Resolving the database 

30 2. Checking if it's None 

31 3. Returning appropriate error message 

32 

33 Returns: 

34 ReflectionDatabaseAdapter instance 

35 

36 Raises: 

37 DatabaseUnavailableError: If database is not available 

38 

39 Example: 

40 >>> db = await require_reflection_database() 

41 >>> # Use db knowing it's not None 

42 

43 """ 

44 db = await resolve_reflection_database() 

45 if not db: 

46 msg = "Reflection database not available. Install dependencies: uv sync --extra embeddings" 

47 raise DatabaseUnavailableError(msg) 

48 return db 

49 

50 

51async def safe_database_operation[T]( 

52 operation: Callable[[ReflectionDatabaseAdapter], Awaitable[T]], 

53 error_message: str = "Database operation", 

54) -> T: 

55 """Execute database operation with automatic database resolution and error handling. 

56 

57 This utility wraps database operations to eliminate the repetitive pattern of: 

58 1. Get database 

59 2. Check if available 

60 3. Execute operation 

61 4. Handle errors 

62 

63 Args: 

64 operation: Async function that takes database and returns result 

65 error_message: Description of operation for error messages 

66 

67 Returns: 

68 Result from the operation 

69 

70 Raises: 

71 DatabaseUnavailableError: If database is not available 

72 Exception: Any exception from the operation (will be logged) 

73 

74 Example: 

75 >>> async def my_query(db): 

76 ... return await db.search_reflections("test") 

77 >>> results = await safe_database_operation(my_query, "Search reflections") 

78 

79 """ 

80 try: 

81 db = await require_reflection_database() 

82 return await operation(db) 

83 except DatabaseUnavailableError: 

84 # Re-raise database unavailable - caller should handle 

85 raise 

86 except Exception as e: 

87 _get_logger().exception(f"Error in {error_message}: {e}") 

88 raise 

89 

90 

91async def safe_database_operation_with_message[T]( 

92 operation: Callable[[ReflectionDatabaseAdapter], Awaitable[T]], 

93 error_message: str = "Database operation", 

94) -> str: 

95 """Execute database operation and return formatted string result. 

96 

97 Similar to safe_database_operation but catches all exceptions and returns 

98 error messages as strings instead of raising. 

99 

100 Args: 

101 operation: Async function that takes database and returns result 

102 error_message: Description of operation for error messages 

103 

104 Returns: 

105 String result from operation or error message 

106 

107 Example: 

108 >>> async def my_query(db): 

109 ... result = await db.search_reflections("test") 

110 ... return f"Found {len(result)} results" 

111 >>> message = await safe_database_operation_with_message(my_query) 

112 >>> print(message) 

113 

114 """ 

115 try: 

116 db = await require_reflection_database() 

117 result = await operation(db) 

118 # If operation returns a string, return it directly 

119 if isinstance(result, str): 

120 return result 

121 # Otherwise, let caller handle the result 

122 return str(result) 

123 except DatabaseUnavailableError as e: 

124 return f"{e!s}" 

125 except Exception as e: 

126 _get_logger().exception(f"Error in {error_message}: {e}") 

127 return f"{error_message} failed: {e!s}" 

128 

129 

130async def batch_database_operation( 

131 items: list[T], 

132 operation: Callable[[ReflectionDatabaseAdapter, T], Awaitable[Any]], 

133 batch_size: int = 100, 

134) -> list[Any]: 

135 """Execute database operation in batches for better performance. 

136 

137 Useful for bulk operations that need to be chunked to avoid overwhelming 

138 the database or memory. 

139 

140 Args: 

141 items: List of items to process 

142 operation: Async function that takes (database, item) and returns result 

143 batch_size: Number of items to process per batch 

144 

145 Returns: 

146 List of results in same order as input items 

147 

148 Raises: 

149 DatabaseUnavailableError: If database is not available 

150 

151 Example: 

152 >>> async def store_item(db, item): 

153 ... return await db.store_reflection(item["content"], item["tags"]) 

154 >>> items = [{"content": "a", "tags": ["t1"]}, ...] 

155 >>> results = await batch_database_operation(items, store_item) 

156 

157 """ 

158 db = await require_reflection_database() 

159 

160 results = [] 

161 for i in range(0, len(items), batch_size): 

162 batch = items[i : i + batch_size] 

163 batch_results = [] 

164 

165 for item in batch: 

166 try: 

167 result = await operation(db, item) 

168 batch_results.append(result) 

169 except Exception as e: 

170 _get_logger().exception(f"Error processing item {item}: {e}") 

171 batch_results.append(None) 

172 

173 results.extend(batch_results) 

174 

175 return results 

176 

177 

178def check_database_available() -> bool: 

179 """Check if reflection database dependencies are available. 

180 

181 This is a synchronous check that can be used before attempting async operations. 

182 

183 Returns: 

184 True if database is available, False otherwise 

185 

186 Example: 

187 >>> if check_database_available(): 

188 ... result = await some_database_operation() 

189 ... else: 

190 ... print("Database not available") 

191 

192 """ 

193 try: 

194 import importlib.util 

195 

196 spec = importlib.util.find_spec("session_buddy.reflection_tools") 

197 if spec is None: 

198 return False 

199 

200 # Check for required dependencies 

201 spec = importlib.util.find_spec("duckdb") 

202 return spec is not None 

203 except ImportError: 

204 return False 

205 

206 

207async def get_database_stats() -> dict[str, Any]: 

208 """Get statistics about database health and availability. 

209 

210 Returns: 

211 Dictionary with database statistics 

212 

213 Example: 

214 >>> stats = await get_database_stats() 

215 >>> print(f"Total reflections: {stats['total_reflections']}") 

216 

217 """ 

218 try: 

219 db = await require_reflection_database() 

220 stats = await db.get_stats() 

221 stats["available"] = True 

222 return stats 

223 except DatabaseUnavailableError: 

224 return { 

225 "available": False, 

226 "error": "Database not available", 

227 "total_reflections": 0, 

228 "total_conversations": 0, 

229 } 

230 except Exception as e: 

231 _get_logger().exception(f"Error getting database stats: {e}") 

232 return { 

233 "available": False, 

234 "error": str(e), 

235 "total_reflections": 0, 

236 "total_conversations": 0, 

237 }