Coverage for src / dataknobs_data / vector / stores / memory.py: 16%
126 statements
« prev ^ index » next coverage.py v7.13.0, created at 2025-12-26 15:45 -0700
« prev ^ index » next coverage.py v7.13.0, created at 2025-12-26 15:45 -0700
1"""In-memory vector store implementation."""
3from __future__ import annotations
5import os
6import pickle
7from typing import Any
8from uuid import uuid4
10import numpy as np
12from .base import VectorStore
15class MemoryVectorStore(VectorStore):
16 """Simple in-memory vector store for testing and development.
18 This implementation stores vectors in memory using numpy arrays
19 and performs brute-force search. Suitable for small datasets
20 and testing scenarios.
21 """
23 def __init__(self, config: dict[str, Any] | None = None):
24 """Initialize memory vector store."""
25 super().__init__(config)
26 self.vectors = {} # id -> vector
27 self.metadata_store = {} # id -> metadata
29 async def initialize(self) -> None:
30 """Initialize the store."""
31 if self._initialized:
32 return
34 # Load existing data if persist path exists
35 if self.persist_path and os.path.exists(self.persist_path):
36 await self.load()
38 self._initialized = True
40 async def close(self) -> None:
41 """Save and close the store."""
42 if self.persist_path and self._initialized:
43 await self.save()
44 self._initialized = False
46 async def save(self) -> None:
47 """Save vectors and metadata to disk."""
48 if not self.persist_path:
49 return
51 # Create directory if needed
52 os.makedirs(os.path.dirname(self.persist_path), exist_ok=True)
54 # Save all data
55 with open(self.persist_path, "wb") as f:
56 pickle.dump({
57 "vectors": {k: v.tolist() for k, v in self.vectors.items()},
58 "metadata_store": self.metadata_store,
59 "config": {
60 "dimensions": self.dimensions,
61 "metric": self.metric.value if hasattr(self.metric, 'value') else str(self.metric),
62 }
63 }, f)
65 async def load(self) -> None:
66 """Load vectors and metadata from disk."""
67 if not self.persist_path or not os.path.exists(self.persist_path):
68 return
70 with open(self.persist_path, "rb") as f:
71 data = pickle.load(f)
72 # Convert lists back to numpy arrays
73 self.vectors = {k: np.array(v, dtype=np.float32) for k, v in data["vectors"].items()}
74 self.metadata_store = data["metadata_store"]
76 async def add_vectors(
77 self,
78 vectors: np.ndarray | list[np.ndarray],
79 ids: list[str] | None = None,
80 metadata: list[dict[str, Any]] | None = None,
81 ) -> list[str]:
82 """Add vectors to memory."""
83 if not self._initialized:
84 await self.initialize()
86 # Convert to numpy array
87 if isinstance(vectors, list):
88 vectors = np.array(vectors, dtype=np.float32)
89 else:
90 vectors = vectors.astype(np.float32)
92 # Ensure 2D array
93 if vectors.ndim == 1:
94 vectors = vectors.reshape(1, -1)
96 # Generate IDs if not provided
97 if ids is None:
98 ids = [str(uuid4()) for _ in range(len(vectors))]
100 # Store vectors and metadata
101 for i, vector_id in enumerate(ids):
102 self.vectors[vector_id] = vectors[i]
103 if metadata and i < len(metadata):
104 self.metadata_store[vector_id] = metadata[i]
105 else:
106 self.metadata_store[vector_id] = {}
108 return ids
110 async def get_vectors(
111 self,
112 ids: list[str],
113 include_metadata: bool = True,
114 ) -> list[tuple[np.ndarray, dict[str, Any] | None]]:
115 """Get vectors by ID."""
116 if not self._initialized:
117 await self.initialize()
119 results = []
120 for vector_id in ids:
121 if vector_id in self.vectors:
122 vector = self.vectors[vector_id]
123 meta = self.metadata_store.get(vector_id) if include_metadata else None
124 results.append((vector, meta))
125 else:
126 results.append((None, None))
128 return results
130 async def delete_vectors(self, ids: list[str]) -> int:
131 """Delete vectors by ID."""
132 if not self._initialized:
133 await self.initialize()
135 deleted = 0
136 for vector_id in ids:
137 if vector_id in self.vectors:
138 del self.vectors[vector_id]
139 self.metadata_store.pop(vector_id, None)
140 deleted += 1
142 return deleted
144 async def search(
145 self,
146 query_vector: np.ndarray,
147 k: int = 10,
148 filter: dict[str, Any] | None = None,
149 include_metadata: bool = True,
150 ) -> list[tuple[str, float, dict[str, Any] | None]]:
151 """Search for similar vectors using brute force."""
152 if not self._initialized:
153 await self.initialize()
155 if not self.vectors:
156 return []
158 # Prepare query
159 query = query_vector.astype(np.float32)
160 if query.ndim == 1:
161 query = query.reshape(1, -1)
163 # Filter candidates
164 candidates = []
165 for vector_id, vector in self.vectors.items():
166 # Apply metadata filter
167 if filter:
168 meta = self.metadata_store.get(vector_id, {})
169 match = all(
170 meta.get(key) == value
171 for key, value in filter.items()
172 )
173 if not match:
174 continue
176 candidates.append((vector_id, vector))
178 if not candidates:
179 return []
181 # Calculate distances using common method
182 scores = []
183 for vector_id, vector in candidates:
184 score = self._calculate_similarity(query[0], vector)
185 scores.append((vector_id, score))
187 # Sort by score (descending for similarity)
188 scores.sort(key=lambda x: x[1], reverse=True)
190 # Return top k
191 results = []
192 for vector_id, score in scores[:k]:
193 meta = self.metadata_store.get(vector_id) if include_metadata else None
194 results.append((vector_id, score, meta))
196 return results
198 async def update_metadata(
199 self,
200 ids: list[str],
201 metadata: list[dict[str, Any]],
202 ) -> int:
203 """Update metadata for vectors."""
204 if not self._initialized:
205 await self.initialize()
207 updated = 0
208 for vector_id, meta in zip(ids, metadata, strict=False):
209 if vector_id in self.vectors:
210 self.metadata_store[vector_id] = meta
211 updated += 1
213 return updated
215 async def count(self, filter: dict[str, Any] | None = None) -> int:
216 """Count vectors."""
217 if not self._initialized:
218 await self.initialize()
220 if filter is None:
221 return len(self.vectors)
223 # Count with filter
224 count = 0
225 for vector_id in self.vectors:
226 meta = self.metadata_store.get(vector_id, {})
227 match = all(
228 meta.get(key) == value
229 for key, value in filter.items()
230 )
231 if match:
232 count += 1
234 return count
236 async def clear(self) -> None:
237 """Clear all vectors."""
238 if not self._initialized:
239 await self.initialize()
241 self.vectors.clear()
242 self.metadata_store.clear()