Coverage for src / dataknobs_data / vector / elasticsearch_utils.py: 0%

73 statements  

« prev     ^ index     » next       coverage.py v7.13.0, created at 2025-12-26 15:45 -0700

1"""Elasticsearch-specific vector utilities.""" 

2 

3from __future__ import annotations 

4 

5import logging 

6from typing import Any 

7 

8import numpy as np 

9 

10from .types import DistanceMetric 

11 

12logger = logging.getLogger(__name__) 

13 

14 

15def get_similarity_for_metric(metric: DistanceMetric) -> str: 

16 """Get Elasticsearch similarity function for a distance metric. 

17  

18 Args: 

19 metric: Distance metric 

20  

21 Returns: 

22 Elasticsearch similarity function name 

23 """ 

24 mapping = { 

25 DistanceMetric.COSINE: "cosine", 

26 DistanceMetric.DOT_PRODUCT: "dot_product", 

27 DistanceMetric.EUCLIDEAN: "l2_norm", 

28 DistanceMetric.INNER_PRODUCT: "dot_product", 

29 } 

30 

31 similarity = mapping.get(metric, "cosine") 

32 logger.debug(f"Using similarity '{similarity}' for metric {metric}") 

33 return similarity 

34 

35 

36def build_knn_query( 

37 query_vector: np.ndarray | list[float], 

38 field_name: str, 

39 k: int = 10, 

40 num_candidates: int | None = None, 

41 filter_query: dict[str, Any] | None = None, 

42) -> dict[str, Any]: 

43 """Build a KNN query for Elasticsearch. 

44  

45 Args: 

46 query_vector: Query vector 

47 field_name: Name of the vector field (will be prefixed with 'data.') 

48 k: Number of results to return 

49 num_candidates: Number of candidates to consider (default: k * 10) 

50 filter_query: Optional filter query 

51  

52 Returns: 

53 Elasticsearch KNN query 

54 """ 

55 # Convert numpy array to list if needed 

56 if isinstance(query_vector, np.ndarray): 

57 query_vector = query_vector.tolist() 

58 

59 # Default num_candidates if not specified 

60 if num_candidates is None: 

61 num_candidates = max(k * 10, 100) 

62 

63 # Build the KNN query 

64 knn_query = { 

65 "field": f"data.{field_name}", 

66 "query_vector": query_vector, 

67 "k": k, 

68 "num_candidates": num_candidates, 

69 } 

70 

71 # Add filter if provided 

72 if filter_query: 

73 knn_query["filter"] = filter_query 

74 

75 return {"knn": knn_query} 

76 

77 

78def build_script_score_query( 

79 query_vector: np.ndarray | list[float], 

80 field_name: str, 

81 metric: DistanceMetric = DistanceMetric.COSINE, 

82 filter_query: dict[str, Any] | None = None, 

83) -> dict[str, Any]: 

84 """Build a script_score query for exact vector search. 

85  

86 Args: 

87 query_vector: Query vector 

88 field_name: Name of the vector field 

89 metric: Distance metric to use 

90 filter_query: Optional filter query 

91  

92 Returns: 

93 Elasticsearch script_score query 

94 """ 

95 # Convert numpy array to list if needed 

96 if isinstance(query_vector, np.ndarray): 

97 query_vector = query_vector.tolist() 

98 

99 # Build the script based on metric 

100 field_path = f"data.{field_name}" 

101 

102 if metric == DistanceMetric.COSINE: 

103 script_source = f"cosineSimilarity(params.query_vector, '{field_path}') + 1.0" 

104 elif metric == DistanceMetric.DOT_PRODUCT or metric == DistanceMetric.INNER_PRODUCT: 

105 script_source = f"dotProduct(params.query_vector, '{field_path}')" 

106 elif metric == DistanceMetric.EUCLIDEAN: 

107 script_source = f"1 / (1 + l2norm(params.query_vector, '{field_path}'))" 

108 else: 

109 # Default to cosine 

110 script_source = f"cosineSimilarity(params.query_vector, '{field_path}') + 1.0" 

111 

112 # Build the query 

113 base_query = filter_query if filter_query else {"match_all": {}} 

114 

115 return { 

116 "script_score": { 

117 "query": base_query, 

118 "script": { 

119 "source": script_source, 

120 "params": { 

121 "query_vector": query_vector 

122 } 

123 } 

124 } 

125 } 

126 

127 

128def build_hybrid_query( 

129 text_query: str, 

130 query_vector: np.ndarray | list[float], 

131 text_fields: list[str], 

132 vector_field: str, 

133 text_boost: float = 1.0, 

134 vector_boost: float = 1.0, 

135 k: int = 10, 

136) -> dict[str, Any]: 

137 """Build a hybrid text + vector search query. 

138  

139 Args: 

140 text_query: Text query string 

141 query_vector: Query vector 

142 text_fields: Fields to search for text 

143 vector_field: Vector field name 

144 text_boost: Boost for text search 

145 vector_boost: Boost for vector search 

146 k: Number of results for KNN 

147  

148 Returns: 

149 Elasticsearch hybrid query 

150 """ 

151 # Convert numpy array to list if needed 

152 if isinstance(query_vector, np.ndarray): 

153 query_vector = query_vector.tolist() 

154 

155 # Build text query 

156 text_query_clause = { 

157 "multi_match": { 

158 "query": text_query, 

159 "fields": [f"data.{field}" for field in text_fields], 

160 "boost": text_boost, 

161 } 

162 } 

163 

164 # Build KNN query 

165 knn_clause = { 

166 "field": f"data.{vector_field}", 

167 "query_vector": query_vector, 

168 "k": k, 

169 "boost": vector_boost, 

170 } 

171 

172 # Combine with bool query 

173 return { 

174 "bool": { 

175 "should": [text_query_clause], 

176 }, 

177 "knn": knn_clause, 

178 } 

179 

180 

181def format_vector_for_elasticsearch(vector: np.ndarray | list[float]) -> list[float]: 

182 """Format a vector for Elasticsearch storage. 

183  

184 Args: 

185 vector: Vector to format 

186  

187 Returns: 

188 List of floats suitable for Elasticsearch 

189 """ 

190 if isinstance(vector, np.ndarray): 

191 # Convert to list and ensure float32 

192 result: list[float] = vector.astype(np.float32).tolist() 

193 return result 

194 elif isinstance(vector, list): 

195 # Ensure all values are floats 

196 return [float(v) for v in vector] 

197 else: 

198 raise ValueError(f"Unsupported vector type: {type(vector)}") 

199 

200 

201def parse_elasticsearch_vector(value: Any) -> np.ndarray | None: 

202 """Parse a vector value from Elasticsearch. 

203  

204 Args: 

205 value: Value from Elasticsearch document 

206  

207 Returns: 

208 Numpy array or None 

209 """ 

210 if value is None: 

211 return None 

212 

213 if isinstance(value, (list, tuple)): 

214 return np.array(value, dtype=np.float32) 

215 elif isinstance(value, np.ndarray): 

216 return value.astype(np.float32) 

217 else: 

218 logger.warning(f"Unexpected vector value type: {type(value)}") 

219 return None 

220 

221 

222def get_vector_mapping( 

223 dimensions: int, 

224 similarity: str = "cosine", 

225 index: bool = True, 

226) -> dict[str, Any]: 

227 """Get Elasticsearch mapping for a vector field. 

228  

229 Args: 

230 dimensions: Number of dimensions 

231 similarity: Similarity metric (cosine, dot_product, l2_norm) 

232 index: Whether to index the field for KNN search 

233  

234 Returns: 

235 Mapping dictionary for the field 

236 """ 

237 return { 

238 "type": "dense_vector", 

239 "dims": dimensions, 

240 "index": index, 

241 "similarity": similarity, 

242 } 

243 

244 

245def estimate_index_parameters(num_vectors: int) -> dict[str, Any]: 

246 """Estimate optimal index parameters based on dataset size. 

247  

248 Args: 

249 num_vectors: Expected number of vectors 

250  

251 Returns: 

252 Dictionary of index parameters 

253 """ 

254 # HNSW parameters based on dataset size 

255 if num_vectors < 10000: 

256 # Small dataset - prioritize accuracy 

257 return { 

258 "index.knn": True, 

259 "index.knn.algo_param.ef_construction": 200, 

260 "index.knn.algo_param.m": 16, 

261 } 

262 elif num_vectors < 100000: 

263 # Medium dataset - balance 

264 return { 

265 "index.knn": True, 

266 "index.knn.algo_param.ef_construction": 100, 

267 "index.knn.algo_param.m": 16, 

268 } 

269 else: 

270 # Large dataset - prioritize speed 

271 return { 

272 "index.knn": True, 

273 "index.knn.algo_param.ef_construction": 50, 

274 "index.knn.algo_param.m": 8, 

275 } 

276 

277 

278def validate_vector_dimensions(vector: np.ndarray | list[float], expected_dims: int) -> bool: 

279 """Validate that a vector has the expected dimensions. 

280  

281 Args: 

282 vector: Vector to validate 

283 expected_dims: Expected number of dimensions 

284  

285 Returns: 

286 True if dimensions match 

287 """ 

288 if isinstance(vector, np.ndarray): 

289 actual_dims = vector.shape[0] if vector.ndim == 1 else vector.shape[-1] 

290 elif isinstance(vector, list): 

291 actual_dims = len(vector) 

292 else: 

293 # Unsupported vector type 

294 return False # type: ignore[unreachable] 

295 

296 if actual_dims != expected_dims: 

297 logger.warning(f"Vector dimension mismatch: expected {expected_dims}, got {actual_dims}") 

298 return False 

299 

300 return True