Coverage for src/dataknobs_data/vector/python_vector_search.py: 8%

97 statements  

« prev     ^ index     » next       coverage.py v7.11.3, created at 2025-11-13 11:23 -0700

1"""Python-based vector search implementation for databases without native vector support.""" 

2 

3import logging 

4from typing import TYPE_CHECKING, Any 

5 

6if TYPE_CHECKING: 

7 from ..records import Record 

8 from .types import VectorSearchResult 

9 

10logger = logging.getLogger(__name__) 

11 

12 

13class PythonVectorSearchMixin: 

14 """Mixin providing Python-based vector similarity search. 

15  

16 This mixin can be used by database backends that don't have native vector 

17 search capabilities (like SQLite) to provide vector search functionality 

18 using Python/NumPy calculations. 

19  

20 The backend must provide: 

21 - A way to fetch all records (with optional filtering) 

22 - A method to extract vector data from records 

23 - The _compute_similarity method (or inherit from a mixin that provides it) 

24 """ 

25 

26 async def python_vector_search_async( 

27 self, 

28 query_vector, 

29 vector_field: str = "embedding", 

30 k: int = 10, 

31 filter=None, 

32 metric=None, 

33 fetch_all_method: str = "search", 

34 fetch_filtered_method: str = "search", 

35 **kwargs 

36 ) -> list["VectorSearchResult"]: 

37 """Perform async vector search using Python calculations. 

38  

39 Args: 

40 query_vector: Query vector 

41 vector_field: Name of the vector field to search 

42 k: Number of results to return 

43 filter: Optional filter conditions 

44 metric: Distance metric to use 

45 fetch_all_method: Name of method to fetch all records 

46 fetch_filtered_method: Name of method to fetch filtered records 

47 **kwargs: Additional arguments 

48  

49 Returns: 

50 List of VectorSearchResult objects 

51 """ 

52 import numpy as np 

53 

54 from ..query import Query 

55 from ..records import Record 

56 from .types import DistanceMetric, VectorSearchResult 

57 

58 # Get metric from parameter or instance default 

59 if metric is None: 

60 metric = getattr(self, 'vector_metric', DistanceMetric.COSINE) 

61 if isinstance(metric, str): 

62 metric = DistanceMetric(metric) 

63 

64 # Ensure query vector is numpy array 

65 if not isinstance(query_vector, np.ndarray): 

66 query_vector = np.array(query_vector, dtype=np.float32) 

67 

68 # Fetch records using search method with proper Query 

69 if filter: 

70 records = await getattr(self, fetch_filtered_method)(filter) 

71 else: 

72 records = await getattr(self, fetch_all_method)(Query()) 

73 

74 # Calculate similarities 

75 results = [] 

76 for record_data in records: 

77 # Handle different record formats and keep original for later use 

78 original_record = record_data 

79 if isinstance(record_data, dict): 

80 data = self._extract_record_data(record_data) 

81 elif isinstance(record_data, Record): 

82 # If we already have a Record object, use it directly 

83 data = record_data.data 

84 else: 

85 data = record_data 

86 

87 # Check if the record has the vector field 

88 if isinstance(data, dict) and vector_field in data and data[vector_field] is not None: 

89 stored_vector = data[vector_field] 

90 

91 # Handle VectorField dict format (from to_dict()) 

92 if isinstance(stored_vector, dict) and 'value' in stored_vector: 

93 stored_vector = stored_vector['value'] 

94 

95 # Convert to numpy array if needed 

96 if not isinstance(stored_vector, np.ndarray): 

97 stored_vector = np.array(stored_vector, dtype=np.float32) 

98 

99 # Calculate similarity 

100 score = self._compute_similarity(query_vector, stored_vector, metric) 

101 

102 # Create Record object for result 

103 if isinstance(original_record, Record): 

104 record = original_record 

105 else: 

106 record = self._create_record_from_data(original_record, data) 

107 

108 # Create result 

109 result = VectorSearchResult( 

110 record=record, 

111 score=float(score), 

112 vector_field=vector_field 

113 ) 

114 results.append(result) 

115 

116 # Sort by score (descending) and return top k 

117 results.sort(key=lambda x: x.score, reverse=True) 

118 return results[:k] 

119 

120 def python_vector_search_sync( 

121 self, 

122 query_vector, 

123 vector_field: str = "embedding", 

124 k: int = 10, 

125 filter=None, 

126 metric=None, 

127 fetch_all_method: str = "search", 

128 fetch_filtered_method: str = "search", 

129 **kwargs 

130 ) -> list["VectorSearchResult"]: 

131 """Perform sync vector search using Python calculations. 

132  

133 Args: 

134 query_vector: Query vector 

135 vector_field: Name of the vector field to search 

136 k: Number of results to return 

137 filter: Optional filter conditions 

138 metric: Distance metric to use 

139 fetch_all_method: Name of method to fetch all records 

140 fetch_filtered_method: Name of method to fetch filtered records 

141 **kwargs: Additional arguments 

142  

143 Returns: 

144 List of VectorSearchResult objects 

145 """ 

146 import numpy as np 

147 

148 from ..query import Query 

149 from ..records import Record 

150 from .types import DistanceMetric, VectorSearchResult 

151 

152 # Get metric from parameter or instance default 

153 if metric is None: 

154 metric = getattr(self, 'vector_metric', DistanceMetric.COSINE) 

155 if isinstance(metric, str): 

156 metric = DistanceMetric(metric) 

157 

158 # Ensure query vector is numpy array 

159 if not isinstance(query_vector, np.ndarray): 

160 query_vector = np.array(query_vector, dtype=np.float32) 

161 

162 # Fetch records using search method with proper Query 

163 if filter: 

164 records = getattr(self, fetch_filtered_method)(filter) 

165 else: 

166 records = getattr(self, fetch_all_method)(Query()) 

167 

168 # Calculate similarities 

169 results = [] 

170 for record_data in records: 

171 # Handle different record formats and keep original for later use 

172 original_record = record_data 

173 if isinstance(record_data, dict): 

174 data = self._extract_record_data(record_data) 

175 elif isinstance(record_data, Record): 

176 # If we already have a Record object, use it directly 

177 data = record_data.data 

178 else: 

179 data = record_data 

180 

181 # Check if the record has the vector field 

182 if isinstance(data, dict) and vector_field in data and data[vector_field] is not None: 

183 stored_vector = data[vector_field] 

184 

185 # Handle VectorField dict format (from to_dict()) 

186 if isinstance(stored_vector, dict) and 'value' in stored_vector: 

187 stored_vector = stored_vector['value'] 

188 

189 # Convert to numpy array if needed 

190 if not isinstance(stored_vector, np.ndarray): 

191 stored_vector = np.array(stored_vector, dtype=np.float32) 

192 

193 # Calculate similarity 

194 score = self._compute_similarity(query_vector, stored_vector, metric) 

195 

196 # Create Record object for result 

197 if isinstance(original_record, Record): 

198 record = original_record 

199 else: 

200 record = self._create_record_from_data(original_record, data) 

201 

202 # Create result 

203 result = VectorSearchResult( 

204 record=record, 

205 score=float(score), 

206 vector_field=vector_field 

207 ) 

208 results.append(result) 

209 

210 # Sort by score (descending) and return top k 

211 results.sort(key=lambda x: x.score, reverse=True) 

212 return results[:k] 

213 

214 def _extract_record_data(self, record_dict: dict[str, Any]) -> dict[str, Any]: 

215 """Extract the actual data from a record dictionary. 

216  

217 Handles different storage formats like: 

218 - Direct data storage 

219 - Data in a 'data' column (JSON) 

220 - Double-nested data structures 

221  

222 Args: 

223 record_dict: Raw record dictionary from database 

224  

225 Returns: 

226 Extracted data dictionary 

227 """ 

228 import json 

229 

230 # Check if there's a 'data' column (common in generic table structures) 

231 if 'data' in record_dict: 

232 data = record_dict['data'] 

233 

234 # Parse JSON if needed 

235 if isinstance(data, str): 

236 data = json.loads(data) 

237 

238 # Handle double-nested data structure 

239 if isinstance(data, dict) and 'data' in data: 

240 data = data['data'] 

241 

242 return data 

243 

244 # Direct storage 

245 return record_dict 

246 

247 def _create_record_from_data(self, record_dict: dict[str, Any], data: dict[str, Any]) -> "Record": 

248 """Create a Record object from raw data. 

249  

250 Args: 

251 record_dict: Original record dictionary (may contain metadata) 

252 data: Extracted data dictionary 

253  

254 Returns: 

255 Record object 

256 """ 

257 import json 

258 

259 from ..records import Record 

260 

261 # Extract metadata if present 

262 metadata = record_dict.get('metadata', {}) 

263 if isinstance(metadata, str): 

264 try: 

265 metadata = json.loads(metadata) if metadata else {} 

266 except json.JSONDecodeError: 

267 metadata = {} 

268 

269 # Create Record with proper initialization 

270 record = Record(data=data, id=record_dict.get('id'), metadata=metadata) 

271 

272 return record