Coverage for src/dataknobs_data/vector/operations.py: 11%

91 statements  

« prev     ^ index     » next       coverage.py v7.11.0, created at 2025-10-29 14:14 -0600

1"""Base vector operations and utilities.""" 

2 

3from __future__ import annotations 

4 

5from typing import TYPE_CHECKING, Any 

6 

7if TYPE_CHECKING: 

8 import numpy as np 

9 

10from .types import DistanceMetric 

11 

12 

13def normalize_vector(vector: np.ndarray) -> np.ndarray: 

14 """Normalize a vector to unit length. 

15 

16 Args: 

17 vector: Vector to normalize 

18 

19 Returns: 

20 Normalized vector 

21 """ 

22 import numpy as np 

23 

24 norm = np.linalg.norm(vector) 

25 if norm == 0: 

26 return vector 

27 return vector / norm 

28 

29 

30def compute_distance( 

31 vec1: np.ndarray, 

32 vec2: np.ndarray, 

33 metric: DistanceMetric, 

34) -> float: 

35 """Compute distance between two vectors. 

36 

37 Args: 

38 vec1: First vector 

39 vec2: Second vector 

40 metric: Distance metric to use 

41 

42 Returns: 

43 Distance value 

44 """ 

45 import numpy as np 

46 

47 if metric == DistanceMetric.COSINE: 

48 # Cosine similarity (1 - similarity for distance) 

49 dot = np.dot(vec1, vec2) 

50 norm1 = np.linalg.norm(vec1) 

51 norm2 = np.linalg.norm(vec2) 

52 if norm1 == 0 or norm2 == 0: 

53 return 1.0 

54 similarity = dot / (norm1 * norm2) 

55 return float(1 - similarity) 

56 

57 elif metric in (DistanceMetric.EUCLIDEAN, DistanceMetric.L2): 

58 # Euclidean/L2 distance 

59 return float(np.linalg.norm(vec1 - vec2)) 

60 

61 elif metric in (DistanceMetric.DOT_PRODUCT, DistanceMetric.INNER_PRODUCT): 

62 # Negative dot product (for distance-based sorting) 

63 return float(-np.dot(vec1, vec2)) 

64 

65 elif metric == DistanceMetric.L1: 

66 # Manhattan/L1 distance 

67 return float(np.sum(np.abs(vec1 - vec2))) 

68 

69 else: 

70 raise ValueError(f"Unknown distance metric: {metric}") 

71 

72 

73def compute_similarity( 

74 vec1: np.ndarray, 

75 vec2: np.ndarray, 

76 metric: DistanceMetric, 

77) -> float: 

78 """Compute similarity between two vectors. 

79 

80 Args: 

81 vec1: First vector 

82 vec2: Second vector 

83 metric: Distance metric to use 

84 

85 Returns: 

86 Similarity value (higher is more similar) 

87 """ 

88 import numpy as np 

89 

90 if metric == DistanceMetric.COSINE: 

91 # Cosine similarity 

92 dot = np.dot(vec1, vec2) 

93 norm1 = np.linalg.norm(vec1) 

94 norm2 = np.linalg.norm(vec2) 

95 if norm1 == 0 or norm2 == 0: 

96 return 0.0 

97 return float(dot / (norm1 * norm2)) 

98 

99 elif metric in (DistanceMetric.EUCLIDEAN, DistanceMetric.L2): 

100 # Convert distance to similarity (inverse) 

101 distance = np.linalg.norm(vec1 - vec2) 

102 return float(1.0 / (1.0 + distance)) 

103 

104 elif metric in (DistanceMetric.DOT_PRODUCT, DistanceMetric.INNER_PRODUCT): 

105 # Direct dot product 

106 return float(np.dot(vec1, vec2)) 

107 

108 elif metric == DistanceMetric.L1: 

109 # Convert L1 distance to similarity 

110 distance = np.sum(np.abs(vec1 - vec2)) 

111 return float(1.0 / (1.0 + distance)) 

112 

113 else: 

114 raise ValueError(f"Unknown distance metric: {metric}") 

115 

116 

117def batch_compute_distances( 

118 query_vector: np.ndarray, 

119 vectors: np.ndarray, 

120 metric: DistanceMetric, 

121) -> np.ndarray: 

122 """Compute distances from query vector to multiple vectors. 

123 

124 Args: 

125 query_vector: Query vector (1D) 

126 vectors: Matrix of vectors (2D) 

127 metric: Distance metric to use 

128 

129 Returns: 

130 Array of distances 

131 """ 

132 import numpy as np 

133 

134 if len(vectors.shape) == 1: 

135 vectors = vectors.reshape(1, -1) 

136 

137 if metric == DistanceMetric.COSINE: 

138 # Batch cosine similarity 

139 dots = np.dot(vectors, query_vector) 

140 query_norm = np.linalg.norm(query_vector) 

141 vector_norms = np.linalg.norm(vectors, axis=1) 

142 

143 # Handle zero norms 

144 valid = (vector_norms != 0) & (query_norm != 0) 

145 similarities = np.zeros(len(vectors)) 

146 similarities[valid] = dots[valid] / (vector_norms[valid] * query_norm) 

147 

148 # Convert to distances 

149 return 1 - similarities 

150 

151 elif metric in (DistanceMetric.EUCLIDEAN, DistanceMetric.L2): 

152 # Batch Euclidean distance 

153 return np.linalg.norm(vectors - query_vector, axis=1) 

154 

155 elif metric in (DistanceMetric.DOT_PRODUCT, DistanceMetric.INNER_PRODUCT): 

156 # Negative dot product for distance 

157 return -np.dot(vectors, query_vector) 

158 

159 elif metric == DistanceMetric.L1: 

160 # Batch L1 distance 

161 return np.sum(np.abs(vectors - query_vector), axis=1) 

162 

163 else: 

164 raise ValueError(f"Unknown distance metric: {metric}") 

165 

166 

167def validate_vector_dimensions( 

168 vector: np.ndarray | list[float], 

169 expected_dims: int, 

170 field_name: str | None = None, 

171) -> np.ndarray: 

172 """Validate and convert vector to proper format. 

173 

174 Args: 

175 vector: Vector to validate 

176 expected_dims: Expected number of dimensions 

177 field_name: Optional field name for error messages 

178 

179 Returns: 

180 Validated numpy array 

181 

182 Raises: 

183 ValueError: If dimensions don't match 

184 """ 

185 import numpy as np 

186 

187 if isinstance(vector, list): 

188 vector = np.array(vector, dtype=np.float32) 

189 

190 actual_dims = len(vector) if vector.ndim == 1 else vector.shape[-1] 

191 

192 if actual_dims != expected_dims: 

193 field_str = f" for field '{field_name}'" if field_name else "" 

194 raise ValueError( 

195 f"Vector dimension mismatch{field_str}: " 

196 f"expected {expected_dims}, got {actual_dims}" 

197 ) 

198 

199 return vector 

200 

201 

202def chunk_vectors( 

203 vectors: np.ndarray | list[np.ndarray], 

204 chunk_size: int, 

205) -> list[np.ndarray]: 

206 """Split vectors into chunks for batch processing. 

207 

208 Args: 

209 vectors: Vectors to chunk 

210 chunk_size: Maximum chunk size 

211 

212 Returns: 

213 List of vector chunks 

214 """ 

215 import numpy as np 

216 

217 if isinstance(vectors, list): 

218 # List of individual vectors 

219 chunks = [] 

220 for i in range(0, len(vectors), chunk_size): 

221 chunk = vectors[i:i + chunk_size] 

222 chunks.append(np.array(chunk)) 

223 return chunks 

224 else: 

225 # Numpy array 

226 chunks = [] 

227 for i in range(0, len(vectors), chunk_size): 

228 chunks.append(vectors[i:i + chunk_size]) 

229 return chunks 

230 

231 

232def estimate_memory_usage( 

233 num_vectors: int, 

234 dimensions: int, 

235 dtype: str = "float32", 

236) -> dict[str, Any]: 

237 """Estimate memory usage for vector storage. 

238 

239 Args: 

240 num_vectors: Number of vectors 

241 dimensions: Number of dimensions per vector 

242 dtype: Data type for vectors 

243 

244 Returns: 

245 Dictionary with memory estimates 

246 """ 

247 bytes_per_element = { 

248 "float16": 2, 

249 "float32": 4, 

250 "float64": 8, 

251 "int8": 1, 

252 "uint8": 1, 

253 }.get(dtype, 4) 

254 

255 vector_bytes = num_vectors * dimensions * bytes_per_element 

256 

257 # Add overhead estimates 

258 index_overhead = vector_bytes * 0.1 # ~10% for basic index 

259 metadata_overhead = num_vectors * 100 # ~100 bytes per vector for metadata 

260 

261 total_bytes = vector_bytes + index_overhead + metadata_overhead 

262 

263 return { 

264 "vector_storage_mb": vector_bytes / (1024 * 1024), 

265 "index_overhead_mb": index_overhead / (1024 * 1024), 

266 "metadata_overhead_mb": metadata_overhead / (1024 * 1024), 

267 "total_mb": total_bytes / (1024 * 1024), 

268 "total_gb": total_bytes / (1024 * 1024 * 1024), 

269 }