Coverage for src/dataknobs_data/vector/elasticsearch_utils.py: 0%
73 statements
« prev ^ index » next coverage.py v7.10.3, created at 2025-08-31 15:06 -0600
« prev ^ index » next coverage.py v7.10.3, created at 2025-08-31 15:06 -0600
1"""Elasticsearch-specific vector utilities."""
3from __future__ import annotations
5import logging
6from typing import Any
8import numpy as np
10from .types import DistanceMetric
12logger = logging.getLogger(__name__)
15def get_similarity_for_metric(metric: DistanceMetric) -> str:
16 """Get Elasticsearch similarity function for a distance metric.
18 Args:
19 metric: Distance metric
21 Returns:
22 Elasticsearch similarity function name
23 """
24 mapping = {
25 DistanceMetric.COSINE: "cosine",
26 DistanceMetric.DOT_PRODUCT: "dot_product",
27 DistanceMetric.EUCLIDEAN: "l2_norm",
28 DistanceMetric.INNER_PRODUCT: "dot_product",
29 }
31 similarity = mapping.get(metric, "cosine")
32 logger.debug(f"Using similarity '{similarity}' for metric {metric}")
33 return similarity
36def build_knn_query(
37 query_vector: np.ndarray | list[float],
38 field_name: str,
39 k: int = 10,
40 num_candidates: int | None = None,
41 filter_query: dict[str, Any] | None = None,
42) -> dict[str, Any]:
43 """Build a KNN query for Elasticsearch.
45 Args:
46 query_vector: Query vector
47 field_name: Name of the vector field (will be prefixed with 'data.')
48 k: Number of results to return
49 num_candidates: Number of candidates to consider (default: k * 10)
50 filter_query: Optional filter query
52 Returns:
53 Elasticsearch KNN query
54 """
55 # Convert numpy array to list if needed
56 if isinstance(query_vector, np.ndarray):
57 query_vector = query_vector.tolist()
59 # Default num_candidates if not specified
60 if num_candidates is None:
61 num_candidates = max(k * 10, 100)
63 # Build the KNN query
64 knn_query = {
65 "field": f"data.{field_name}",
66 "query_vector": query_vector,
67 "k": k,
68 "num_candidates": num_candidates,
69 }
71 # Add filter if provided
72 if filter_query:
73 knn_query["filter"] = filter_query
75 return {"knn": knn_query}
78def build_script_score_query(
79 query_vector: np.ndarray | list[float],
80 field_name: str,
81 metric: DistanceMetric = DistanceMetric.COSINE,
82 filter_query: dict[str, Any] | None = None,
83) -> dict[str, Any]:
84 """Build a script_score query for exact vector search.
86 Args:
87 query_vector: Query vector
88 field_name: Name of the vector field
89 metric: Distance metric to use
90 filter_query: Optional filter query
92 Returns:
93 Elasticsearch script_score query
94 """
95 # Convert numpy array to list if needed
96 if isinstance(query_vector, np.ndarray):
97 query_vector = query_vector.tolist()
99 # Build the script based on metric
100 field_path = f"data.{field_name}"
102 if metric == DistanceMetric.COSINE:
103 script_source = f"cosineSimilarity(params.query_vector, '{field_path}') + 1.0"
104 elif metric == DistanceMetric.DOT_PRODUCT or metric == DistanceMetric.INNER_PRODUCT:
105 script_source = f"dotProduct(params.query_vector, '{field_path}')"
106 elif metric == DistanceMetric.EUCLIDEAN:
107 script_source = f"1 / (1 + l2norm(params.query_vector, '{field_path}'))"
108 else:
109 # Default to cosine
110 script_source = f"cosineSimilarity(params.query_vector, '{field_path}') + 1.0"
112 # Build the query
113 base_query = filter_query if filter_query else {"match_all": {}}
115 return {
116 "script_score": {
117 "query": base_query,
118 "script": {
119 "source": script_source,
120 "params": {
121 "query_vector": query_vector
122 }
123 }
124 }
125 }
128def build_hybrid_query(
129 text_query: str,
130 query_vector: np.ndarray | list[float],
131 text_fields: list[str],
132 vector_field: str,
133 text_boost: float = 1.0,
134 vector_boost: float = 1.0,
135 k: int = 10,
136) -> dict[str, Any]:
137 """Build a hybrid text + vector search query.
139 Args:
140 text_query: Text query string
141 query_vector: Query vector
142 text_fields: Fields to search for text
143 vector_field: Vector field name
144 text_boost: Boost for text search
145 vector_boost: Boost for vector search
146 k: Number of results for KNN
148 Returns:
149 Elasticsearch hybrid query
150 """
151 # Convert numpy array to list if needed
152 if isinstance(query_vector, np.ndarray):
153 query_vector = query_vector.tolist()
155 # Build text query
156 text_query_clause = {
157 "multi_match": {
158 "query": text_query,
159 "fields": [f"data.{field}" for field in text_fields],
160 "boost": text_boost,
161 }
162 }
164 # Build KNN query
165 knn_clause = {
166 "field": f"data.{vector_field}",
167 "query_vector": query_vector,
168 "k": k,
169 "boost": vector_boost,
170 }
172 # Combine with bool query
173 return {
174 "bool": {
175 "should": [text_query_clause],
176 },
177 "knn": knn_clause,
178 }
181def format_vector_for_elasticsearch(vector: np.ndarray | list[float]) -> list[float]:
182 """Format a vector for Elasticsearch storage.
184 Args:
185 vector: Vector to format
187 Returns:
188 List of floats suitable for Elasticsearch
189 """
190 if isinstance(vector, np.ndarray):
191 # Convert to list and ensure float32
192 result: list[float] = vector.astype(np.float32).tolist()
193 return result
194 elif isinstance(vector, list):
195 # Ensure all values are floats
196 return [float(v) for v in vector]
197 else:
198 raise ValueError(f"Unsupported vector type: {type(vector)}")
201def parse_elasticsearch_vector(value: Any) -> np.ndarray | None:
202 """Parse a vector value from Elasticsearch.
204 Args:
205 value: Value from Elasticsearch document
207 Returns:
208 Numpy array or None
209 """
210 if value is None:
211 return None
213 if isinstance(value, (list, tuple)):
214 return np.array(value, dtype=np.float32)
215 elif isinstance(value, np.ndarray):
216 return value.astype(np.float32)
217 else:
218 logger.warning(f"Unexpected vector value type: {type(value)}")
219 return None
222def get_vector_mapping(
223 dimensions: int,
224 similarity: str = "cosine",
225 index: bool = True,
226) -> dict[str, Any]:
227 """Get Elasticsearch mapping for a vector field.
229 Args:
230 dimensions: Number of dimensions
231 similarity: Similarity metric (cosine, dot_product, l2_norm)
232 index: Whether to index the field for KNN search
234 Returns:
235 Mapping dictionary for the field
236 """
237 return {
238 "type": "dense_vector",
239 "dims": dimensions,
240 "index": index,
241 "similarity": similarity,
242 }
245def estimate_index_parameters(num_vectors: int) -> dict[str, Any]:
246 """Estimate optimal index parameters based on dataset size.
248 Args:
249 num_vectors: Expected number of vectors
251 Returns:
252 Dictionary of index parameters
253 """
254 # HNSW parameters based on dataset size
255 if num_vectors < 10000:
256 # Small dataset - prioritize accuracy
257 return {
258 "index.knn": True,
259 "index.knn.algo_param.ef_construction": 200,
260 "index.knn.algo_param.m": 16,
261 }
262 elif num_vectors < 100000:
263 # Medium dataset - balance
264 return {
265 "index.knn": True,
266 "index.knn.algo_param.ef_construction": 100,
267 "index.knn.algo_param.m": 16,
268 }
269 else:
270 # Large dataset - prioritize speed
271 return {
272 "index.knn": True,
273 "index.knn.algo_param.ef_construction": 50,
274 "index.knn.algo_param.m": 8,
275 }
278def validate_vector_dimensions(vector: np.ndarray | list[float], expected_dims: int) -> bool:
279 """Validate that a vector has the expected dimensions.
281 Args:
282 vector: Vector to validate
283 expected_dims: Expected number of dimensions
285 Returns:
286 True if dimensions match
287 """
288 if isinstance(vector, np.ndarray):
289 actual_dims = vector.shape[0] if vector.ndim == 1 else vector.shape[-1]
290 elif isinstance(vector, list):
291 actual_dims = len(vector)
292 else:
293 # Unsupported vector type
294 return False # type: ignore[unreachable]
296 if actual_dims != expected_dims:
297 logger.warning(f"Vector dimension mismatch: expected {expected_dims}, got {actual_dims}")
298 return False
300 return True