Coverage for src / dataknobs_data / vector / stores / memory.py: 16%

126 statements  

« prev     ^ index     » next       coverage.py v7.13.0, created at 2025-12-26 15:45 -0700

1"""In-memory vector store implementation.""" 

2 

3from __future__ import annotations 

4 

5import os 

6import pickle 

7from typing import Any 

8from uuid import uuid4 

9 

10import numpy as np 

11 

12from .base import VectorStore 

13 

14 

15class MemoryVectorStore(VectorStore): 

16 """Simple in-memory vector store for testing and development. 

17  

18 This implementation stores vectors in memory using numpy arrays 

19 and performs brute-force search. Suitable for small datasets 

20 and testing scenarios. 

21 """ 

22 

23 def __init__(self, config: dict[str, Any] | None = None): 

24 """Initialize memory vector store.""" 

25 super().__init__(config) 

26 self.vectors = {} # id -> vector 

27 self.metadata_store = {} # id -> metadata 

28 

29 async def initialize(self) -> None: 

30 """Initialize the store.""" 

31 if self._initialized: 

32 return 

33 

34 # Load existing data if persist path exists 

35 if self.persist_path and os.path.exists(self.persist_path): 

36 await self.load() 

37 

38 self._initialized = True 

39 

40 async def close(self) -> None: 

41 """Save and close the store.""" 

42 if self.persist_path and self._initialized: 

43 await self.save() 

44 self._initialized = False 

45 

46 async def save(self) -> None: 

47 """Save vectors and metadata to disk.""" 

48 if not self.persist_path: 

49 return 

50 

51 # Create directory if needed 

52 os.makedirs(os.path.dirname(self.persist_path), exist_ok=True) 

53 

54 # Save all data 

55 with open(self.persist_path, "wb") as f: 

56 pickle.dump({ 

57 "vectors": {k: v.tolist() for k, v in self.vectors.items()}, 

58 "metadata_store": self.metadata_store, 

59 "config": { 

60 "dimensions": self.dimensions, 

61 "metric": self.metric.value if hasattr(self.metric, 'value') else str(self.metric), 

62 } 

63 }, f) 

64 

65 async def load(self) -> None: 

66 """Load vectors and metadata from disk.""" 

67 if not self.persist_path or not os.path.exists(self.persist_path): 

68 return 

69 

70 with open(self.persist_path, "rb") as f: 

71 data = pickle.load(f) 

72 # Convert lists back to numpy arrays 

73 self.vectors = {k: np.array(v, dtype=np.float32) for k, v in data["vectors"].items()} 

74 self.metadata_store = data["metadata_store"] 

75 

76 async def add_vectors( 

77 self, 

78 vectors: np.ndarray | list[np.ndarray], 

79 ids: list[str] | None = None, 

80 metadata: list[dict[str, Any]] | None = None, 

81 ) -> list[str]: 

82 """Add vectors to memory.""" 

83 if not self._initialized: 

84 await self.initialize() 

85 

86 # Convert to numpy array 

87 if isinstance(vectors, list): 

88 vectors = np.array(vectors, dtype=np.float32) 

89 else: 

90 vectors = vectors.astype(np.float32) 

91 

92 # Ensure 2D array 

93 if vectors.ndim == 1: 

94 vectors = vectors.reshape(1, -1) 

95 

96 # Generate IDs if not provided 

97 if ids is None: 

98 ids = [str(uuid4()) for _ in range(len(vectors))] 

99 

100 # Store vectors and metadata 

101 for i, vector_id in enumerate(ids): 

102 self.vectors[vector_id] = vectors[i] 

103 if metadata and i < len(metadata): 

104 self.metadata_store[vector_id] = metadata[i] 

105 else: 

106 self.metadata_store[vector_id] = {} 

107 

108 return ids 

109 

110 async def get_vectors( 

111 self, 

112 ids: list[str], 

113 include_metadata: bool = True, 

114 ) -> list[tuple[np.ndarray, dict[str, Any] | None]]: 

115 """Get vectors by ID.""" 

116 if not self._initialized: 

117 await self.initialize() 

118 

119 results = [] 

120 for vector_id in ids: 

121 if vector_id in self.vectors: 

122 vector = self.vectors[vector_id] 

123 meta = self.metadata_store.get(vector_id) if include_metadata else None 

124 results.append((vector, meta)) 

125 else: 

126 results.append((None, None)) 

127 

128 return results 

129 

130 async def delete_vectors(self, ids: list[str]) -> int: 

131 """Delete vectors by ID.""" 

132 if not self._initialized: 

133 await self.initialize() 

134 

135 deleted = 0 

136 for vector_id in ids: 

137 if vector_id in self.vectors: 

138 del self.vectors[vector_id] 

139 self.metadata_store.pop(vector_id, None) 

140 deleted += 1 

141 

142 return deleted 

143 

144 async def search( 

145 self, 

146 query_vector: np.ndarray, 

147 k: int = 10, 

148 filter: dict[str, Any] | None = None, 

149 include_metadata: bool = True, 

150 ) -> list[tuple[str, float, dict[str, Any] | None]]: 

151 """Search for similar vectors using brute force.""" 

152 if not self._initialized: 

153 await self.initialize() 

154 

155 if not self.vectors: 

156 return [] 

157 

158 # Prepare query 

159 query = query_vector.astype(np.float32) 

160 if query.ndim == 1: 

161 query = query.reshape(1, -1) 

162 

163 # Filter candidates 

164 candidates = [] 

165 for vector_id, vector in self.vectors.items(): 

166 # Apply metadata filter 

167 if filter: 

168 meta = self.metadata_store.get(vector_id, {}) 

169 match = all( 

170 meta.get(key) == value 

171 for key, value in filter.items() 

172 ) 

173 if not match: 

174 continue 

175 

176 candidates.append((vector_id, vector)) 

177 

178 if not candidates: 

179 return [] 

180 

181 # Calculate distances using common method 

182 scores = [] 

183 for vector_id, vector in candidates: 

184 score = self._calculate_similarity(query[0], vector) 

185 scores.append((vector_id, score)) 

186 

187 # Sort by score (descending for similarity) 

188 scores.sort(key=lambda x: x[1], reverse=True) 

189 

190 # Return top k 

191 results = [] 

192 for vector_id, score in scores[:k]: 

193 meta = self.metadata_store.get(vector_id) if include_metadata else None 

194 results.append((vector_id, score, meta)) 

195 

196 return results 

197 

198 async def update_metadata( 

199 self, 

200 ids: list[str], 

201 metadata: list[dict[str, Any]], 

202 ) -> int: 

203 """Update metadata for vectors.""" 

204 if not self._initialized: 

205 await self.initialize() 

206 

207 updated = 0 

208 for vector_id, meta in zip(ids, metadata, strict=False): 

209 if vector_id in self.vectors: 

210 self.metadata_store[vector_id] = meta 

211 updated += 1 

212 

213 return updated 

214 

215 async def count(self, filter: dict[str, Any] | None = None) -> int: 

216 """Count vectors.""" 

217 if not self._initialized: 

218 await self.initialize() 

219 

220 if filter is None: 

221 return len(self.vectors) 

222 

223 # Count with filter 

224 count = 0 

225 for vector_id in self.vectors: 

226 meta = self.metadata_store.get(vector_id, {}) 

227 match = all( 

228 meta.get(key) == value 

229 for key, value in filter.items() 

230 ) 

231 if match: 

232 count += 1 

233 

234 return count 

235 

236 async def clear(self) -> None: 

237 """Clear all vectors.""" 

238 if not self._initialized: 

239 await self.initialize() 

240 

241 self.vectors.clear() 

242 self.metadata_store.clear()