Coverage for src / dataknobs_data / vector / stores / common.py: 17%

94 statements  

« prev     ^ index     » next       coverage.py v7.13.0, created at 2025-12-26 15:45 -0700

1"""Common base implementation for vector stores.""" 

2 

3from __future__ import annotations 

4 

5import os 

6from pathlib import Path 

7from typing import TYPE_CHECKING, Any, cast 

8 

9from dataknobs_config import ConfigurableBase 

10 

11from ..types import DistanceMetric 

12 

13if TYPE_CHECKING: 

14 import numpy as np 

15 

16 

17class VectorStoreBase(ConfigurableBase): 

18 """Base implementation with common functionality for all vector stores. 

19  

20 This class provides: 

21 - Configuration parsing following the database pattern 

22 - Common parameter extraction 

23 - Shared utility methods 

24 """ 

25 

26 def __init__(self, config: dict[str, Any] | None = None): 

27 """Initialize vector store with configuration. 

28  

29 Args: 

30 config: Configuration dictionary with backend-specific parameters 

31 """ 

32 # ConfigurableBase doesn't have __init__, so don't call super().__init__() 

33 self.config = config or {} 

34 self._parse_common_config() 

35 self._parse_backend_config() 

36 self._initialized = False 

37 

38 def _parse_common_config(self) -> None: 

39 """Parse common configuration parameters. 

40  

41 Extracts parameters that are common to all vector stores: 

42 - dimensions: Vector dimensions 

43 - metric: Distance metric 

44 - persist_path: Path for persistent storage 

45 - batch_size: Batch size for operations 

46 - index_params: Index-specific parameters 

47 - search_params: Search-specific parameters 

48 """ 

49 # Extract dimensions (required for most stores) 

50 self.dimensions = self.config.get("dimensions", 0) 

51 

52 # Extract and parse metric 

53 metric = self.config.get("metric", "cosine") 

54 if isinstance(metric, str): 

55 self.metric = DistanceMetric(metric) 

56 else: 

57 self.metric = metric 

58 

59 # Extract paths and sizes (expand ~ to home directory) 

60 persist_path = self.config.get("persist_path") 

61 self.persist_path = Path(persist_path).expanduser() if persist_path else None 

62 self.batch_size = self.config.get("batch_size", 100) 

63 

64 # Debug logging for path resolution 

65 import logging 

66 logger = logging.getLogger(__name__) 

67 if persist_path: 

68 logger.info(f"VectorStore persist_path: {persist_path} -> {self.persist_path} (exists: {os.path.exists(self.persist_path) if self.persist_path else False})") 

69 

70 # Extract parameter dictionaries 

71 self.index_params = self.config.get("index_params", {}) 

72 self.search_params = self.config.get("search_params", {}) 

73 

74 # Store any additional metadata 

75 self.metadata = self.config.get("metadata", {}) 

76 

77 def _parse_backend_config(self) -> None: 

78 """Parse backend-specific configuration. 

79  

80 Override this method in subclasses to handle backend-specific parameters. 

81 """ 

82 pass 

83 

84 def _validate_dimensions(self) -> None: 

85 """Validate vector dimensions. 

86  

87 Raises: 

88 ValueError: If dimensions are invalid 

89 """ 

90 if self.dimensions <= 0: 

91 raise ValueError(f"Dimensions must be positive, got {self.dimensions}") 

92 if self.dimensions > 65536: 

93 raise ValueError(f"Dimensions {self.dimensions} exceeds maximum (65536)") 

94 

95 def _normalize_vector(self, vector: np.ndarray) -> np.ndarray: 

96 """Normalize a vector for cosine similarity. 

97  

98 Args: 

99 vector: Vector to normalize 

100  

101 Returns: 

102 Normalized vector 

103 """ 

104 import numpy as np 

105 

106 norm = np.linalg.norm(vector) 

107 if norm == 0: 

108 return vector 

109 return vector / norm 

110 

111 def _calculate_similarity(self, vec1: np.ndarray, vec2: np.ndarray) -> float: 

112 """Calculate similarity between two vectors based on configured metric. 

113  

114 Args: 

115 vec1: First vector 

116 vec2: Second vector 

117  

118 Returns: 

119 Similarity score 

120 """ 

121 import numpy as np 

122 

123 if self.metric == DistanceMetric.COSINE: 

124 # Cosine similarity 

125 norm1 = np.linalg.norm(vec1) 

126 norm2 = np.linalg.norm(vec2) 

127 if norm1 == 0 or norm2 == 0: 

128 return 0.0 

129 return float(np.dot(vec1, vec2) / (norm1 * norm2)) 

130 

131 elif self.metric in (DistanceMetric.EUCLIDEAN, DistanceMetric.L2): 

132 # Convert distance to similarity 

133 distance = float(np.linalg.norm(vec1 - vec2)) 

134 return 1.0 / (1.0 + distance) 

135 

136 elif self.metric in (DistanceMetric.DOT_PRODUCT, DistanceMetric.INNER_PRODUCT): 

137 # Dot product 

138 return float(np.dot(vec1, vec2)) 

139 

140 elif self.metric == DistanceMetric.L1: 

141 # Manhattan distance to similarity 

142 distance = np.sum(np.abs(vec1 - vec2)) 

143 return 1.0 / (1.0 + distance) 

144 

145 else: 

146 # Default to cosine 

147 norm1 = np.linalg.norm(vec1) 

148 norm2 = np.linalg.norm(vec2) 

149 if norm1 == 0 or norm2 == 0: 

150 return 0.0 

151 return float(np.dot(vec1, vec2) / (norm1 * norm2)) 

152 

153 def _convert_distance_to_score(self, distance: float) -> float: 

154 """Convert a distance to a similarity score based on metric. 

155  

156 Args: 

157 distance: Distance value 

158  

159 Returns: 

160 Similarity score (higher is more similar) 

161 """ 

162 if self.metric == DistanceMetric.COSINE: 

163 # Cosine distance is 1 - similarity 

164 return 1.0 - distance 

165 elif self.metric in (DistanceMetric.EUCLIDEAN, DistanceMetric.L2): 

166 # Convert distance to similarity 

167 return 1.0 / (1.0 + distance) 

168 elif self.metric == DistanceMetric.L1: 

169 # Manhattan distance to similarity 

170 return 1.0 / (1.0 + distance) 

171 else: 

172 # For dot product and others, higher is better 

173 return distance 

174 

175 def _prepare_vector(self, vector: np.ndarray | list[float] | list[np.ndarray], normalize: bool = False) -> np.ndarray: 

176 """Prepare a vector for storage or search. 

177  

178 Args: 

179 vector: Input vector (numpy array, list of floats, or list of arrays) 

180 normalize: Whether to normalize for cosine similarity 

181  

182 Returns: 

183 Prepared numpy array 

184 """ 

185 import numpy as np 

186 

187 # Convert to numpy array 

188 if isinstance(vector, list): 

189 if len(vector) > 0 and isinstance(vector[0], np.ndarray): 

190 # List of arrays - stack them 

191 vector = np.vstack(vector).astype(np.float32) 

192 else: 

193 # List of floats 

194 vector = np.array(vector, dtype=np.float32) 

195 else: 

196 vector = np.asarray(vector, dtype=np.float32) 

197 

198 # Ensure vector is an ndarray at this point 

199 assert isinstance(vector, np.ndarray) 

200 

201 # Ensure correct shape 

202 if vector.ndim == 1: 

203 vector = vector.reshape(1, -1) 

204 

205 # Normalize if needed (e.g., for cosine similarity) 

206 if normalize or self.metric == DistanceMetric.COSINE: 

207 # Apply normalization for cosine similarity 

208 norms = np.linalg.norm(vector, axis=1, keepdims=True) 

209 norms[norms == 0] = 1 # Avoid division by zero 

210 vector = vector / norms 

211 

212 return cast("np.ndarray", vector) 

213 

214 def _apply_metadata_filter(self, candidates: list[tuple[Any, dict]], filter: dict[str, Any]) -> list[tuple[Any, dict]]: 

215 """Apply metadata filter to candidates. 

216  

217 Args: 

218 candidates: List of (id, metadata) tuples 

219 filter: Filter criteria as key-value pairs 

220  

221 Returns: 

222 Filtered list of candidates 

223 """ 

224 if not filter: 

225 return candidates 

226 

227 filtered = [] 

228 for item_id, metadata in candidates: 

229 # Check if all filter conditions match 

230 match = all( 

231 metadata.get(key) == value 

232 for key, value in filter.items() 

233 ) 

234 if match: 

235 filtered.append((item_id, metadata)) 

236 

237 return filtered 

238 

239 def __repr__(self) -> str: 

240 """String representation.""" 

241 return ( 

242 f"{self.__class__.__name__}(" 

243 f"dimensions={self.dimensions}, " 

244 f"metric={self.metric.value})" 

245 )