Coverage for src/dataknobs_data/vector/stores/memory.py: 15%

105 statements  

« prev     ^ index     » next       coverage.py v7.11.3, created at 2025-11-13 11:23 -0700

1"""In-memory vector store implementation.""" 

2 

3from __future__ import annotations 

4 

5from typing import Any 

6from uuid import uuid4 

7 

8import numpy as np 

9 

10from .base import VectorStore 

11 

12 

13class MemoryVectorStore(VectorStore): 

14 """Simple in-memory vector store for testing and development. 

15  

16 This implementation stores vectors in memory using numpy arrays 

17 and performs brute-force search. Suitable for small datasets 

18 and testing scenarios. 

19 """ 

20 

21 def __init__(self, config: dict[str, Any] | None = None): 

22 """Initialize memory vector store.""" 

23 super().__init__(config) 

24 self.vectors = {} # id -> vector 

25 self.metadata_store = {} # id -> metadata 

26 

27 async def initialize(self) -> None: 

28 """Initialize the store.""" 

29 self._initialized = True 

30 

31 async def close(self) -> None: 

32 """Close the store.""" 

33 self._initialized = False 

34 

35 async def add_vectors( 

36 self, 

37 vectors: np.ndarray | list[np.ndarray], 

38 ids: list[str] | None = None, 

39 metadata: list[dict[str, Any]] | None = None, 

40 ) -> list[str]: 

41 """Add vectors to memory.""" 

42 if not self._initialized: 

43 await self.initialize() 

44 

45 # Convert to numpy array 

46 if isinstance(vectors, list): 

47 vectors = np.array(vectors, dtype=np.float32) 

48 else: 

49 vectors = vectors.astype(np.float32) 

50 

51 # Ensure 2D array 

52 if vectors.ndim == 1: 

53 vectors = vectors.reshape(1, -1) 

54 

55 # Generate IDs if not provided 

56 if ids is None: 

57 ids = [str(uuid4()) for _ in range(len(vectors))] 

58 

59 # Store vectors and metadata 

60 for i, vector_id in enumerate(ids): 

61 self.vectors[vector_id] = vectors[i] 

62 if metadata and i < len(metadata): 

63 self.metadata_store[vector_id] = metadata[i] 

64 else: 

65 self.metadata_store[vector_id] = {} 

66 

67 return ids 

68 

69 async def get_vectors( 

70 self, 

71 ids: list[str], 

72 include_metadata: bool = True, 

73 ) -> list[tuple[np.ndarray, dict[str, Any] | None]]: 

74 """Get vectors by ID.""" 

75 if not self._initialized: 

76 await self.initialize() 

77 

78 results = [] 

79 for vector_id in ids: 

80 if vector_id in self.vectors: 

81 vector = self.vectors[vector_id] 

82 meta = self.metadata_store.get(vector_id) if include_metadata else None 

83 results.append((vector, meta)) 

84 else: 

85 results.append((None, None)) 

86 

87 return results 

88 

89 async def delete_vectors(self, ids: list[str]) -> int: 

90 """Delete vectors by ID.""" 

91 if not self._initialized: 

92 await self.initialize() 

93 

94 deleted = 0 

95 for vector_id in ids: 

96 if vector_id in self.vectors: 

97 del self.vectors[vector_id] 

98 self.metadata_store.pop(vector_id, None) 

99 deleted += 1 

100 

101 return deleted 

102 

103 async def search( 

104 self, 

105 query_vector: np.ndarray, 

106 k: int = 10, 

107 filter: dict[str, Any] | None = None, 

108 include_metadata: bool = True, 

109 ) -> list[tuple[str, float, dict[str, Any] | None]]: 

110 """Search for similar vectors using brute force.""" 

111 if not self._initialized: 

112 await self.initialize() 

113 

114 if not self.vectors: 

115 return [] 

116 

117 # Prepare query 

118 query = query_vector.astype(np.float32) 

119 if query.ndim == 1: 

120 query = query.reshape(1, -1) 

121 

122 # Filter candidates 

123 candidates = [] 

124 for vector_id, vector in self.vectors.items(): 

125 # Apply metadata filter 

126 if filter: 

127 meta = self.metadata_store.get(vector_id, {}) 

128 match = all( 

129 meta.get(key) == value 

130 for key, value in filter.items() 

131 ) 

132 if not match: 

133 continue 

134 

135 candidates.append((vector_id, vector)) 

136 

137 if not candidates: 

138 return [] 

139 

140 # Calculate distances using common method 

141 scores = [] 

142 for vector_id, vector in candidates: 

143 score = self._calculate_similarity(query[0], vector) 

144 scores.append((vector_id, score)) 

145 

146 # Sort by score (descending for similarity) 

147 scores.sort(key=lambda x: x[1], reverse=True) 

148 

149 # Return top k 

150 results = [] 

151 for vector_id, score in scores[:k]: 

152 meta = self.metadata_store.get(vector_id) if include_metadata else None 

153 results.append((vector_id, score, meta)) 

154 

155 return results 

156 

157 async def update_metadata( 

158 self, 

159 ids: list[str], 

160 metadata: list[dict[str, Any]], 

161 ) -> int: 

162 """Update metadata for vectors.""" 

163 if not self._initialized: 

164 await self.initialize() 

165 

166 updated = 0 

167 for vector_id, meta in zip(ids, metadata, strict=False): 

168 if vector_id in self.vectors: 

169 self.metadata_store[vector_id] = meta 

170 updated += 1 

171 

172 return updated 

173 

174 async def count(self, filter: dict[str, Any] | None = None) -> int: 

175 """Count vectors.""" 

176 if not self._initialized: 

177 await self.initialize() 

178 

179 if filter is None: 

180 return len(self.vectors) 

181 

182 # Count with filter 

183 count = 0 

184 for vector_id in self.vectors: 

185 meta = self.metadata_store.get(vector_id, {}) 

186 match = all( 

187 meta.get(key) == value 

188 for key, value in filter.items() 

189 ) 

190 if match: 

191 count += 1 

192 

193 return count 

194 

195 async def clear(self) -> None: 

196 """Clear all vectors.""" 

197 if not self._initialized: 

198 await self.initialize() 

199 

200 self.vectors.clear() 

201 self.metadata_store.clear()