Coverage for src/dataknobs_data/vector/benchmarks.py: 0%
204 statements
« prev ^ index » next coverage.py v7.10.3, created at 2025-08-31 15:06 -0600
« prev ^ index » next coverage.py v7.10.3, created at 2025-08-31 15:06 -0600
1"""Vector store performance benchmarks."""
3from __future__ import annotations
5import asyncio
6import logging
7import time
8from dataclasses import dataclass
9from typing import Any, TYPE_CHECKING
11import numpy as np
13if TYPE_CHECKING:
14 from .stores.base import VectorStore
17logger = logging.getLogger(__name__)
20@dataclass
21class BenchmarkResult:
22 """Results from a benchmark run."""
24 operation: str
25 num_vectors: int
26 vector_dim: int
27 duration: float
28 throughput: float
29 memory_used: int | None = None
30 latency_p50: float | None = None
31 latency_p95: float | None = None
32 latency_p99: float | None = None
33 metadata: dict[str, Any] | None = None
35 def __post_init__(self):
36 if self.metadata is None:
37 self.metadata = {}
39 def __str__(self) -> str:
40 """String representation of results."""
41 lines = [
42 f"Operation: {self.operation}",
43 f"Vectors: {self.num_vectors:,}",
44 f"Dimensions: {self.vector_dim}",
45 f"Duration: {self.duration:.3f}s",
46 f"Throughput: {self.throughput:.0f} vectors/s"
47 ]
49 if self.latency_p50 is not None:
50 lines.append(f"Latency P50: {self.latency_p50*1000:.2f}ms")
51 if self.latency_p95 is not None:
52 lines.append(f"Latency P95: {self.latency_p95*1000:.2f}ms")
53 if self.latency_p99 is not None:
54 lines.append(f"Latency P99: {self.latency_p99*1000:.2f}ms")
55 if self.memory_used is not None:
56 lines.append(f"Memory: {self.memory_used / (1024*1024):.1f}MB")
58 return "\n".join(lines)
61class VectorStoreBenchmark:
62 """Benchmarks for vector store operations."""
64 def __init__(self, store: VectorStore):
65 """Initialize benchmark with a vector store.
67 Args:
68 store: Vector store to benchmark
69 """
70 self.store = store
71 self.results: list[BenchmarkResult] = []
72 self.rng = np.random.default_rng() # Create RNG once for all benchmarks
74 async def benchmark_indexing(
75 self,
76 num_vectors: int = 10000,
77 vector_dim: int = 128,
78 batch_size: int = 100
79 ) -> BenchmarkResult:
80 """Benchmark vector indexing performance.
82 Args:
83 num_vectors: Number of vectors to index
84 vector_dim: Dimension of vectors
85 batch_size: Batch size for indexing
87 Returns:
88 Benchmark results
89 """
90 logger.info(f"Benchmarking indexing: {num_vectors} vectors of dim {vector_dim}")
92 # Generate random vectors
93 vectors = self.rng.random((num_vectors, vector_dim), dtype=np.float32)
94 ids = [str(i) for i in range(num_vectors)]
95 metadata = [{"index": i} for i in range(num_vectors)]
97 # Measure indexing time
98 start_time = time.time()
100 # Index in batches
101 for i in range(0, num_vectors, batch_size):
102 batch_end = min(i + batch_size, num_vectors)
103 await self.store.add_vectors(
104 vectors[i:batch_end],
105 ids=ids[i:batch_end],
106 metadata=metadata[i:batch_end]
107 )
109 duration = time.time() - start_time
110 throughput = num_vectors / duration if duration > 0 else 0
112 result = BenchmarkResult(
113 operation="indexing",
114 num_vectors=num_vectors,
115 vector_dim=vector_dim,
116 duration=duration,
117 throughput=throughput,
118 metadata={"batch_size": batch_size}
119 )
121 self.results.append(result)
122 return result
124 async def benchmark_search(
125 self,
126 num_queries: int = 1000,
127 k: int = 10,
128 vector_dim: int = 128
129 ) -> BenchmarkResult:
130 """Benchmark vector search performance.
132 Args:
133 num_queries: Number of search queries
134 k: Number of results per query
135 vector_dim: Dimension of query vectors
137 Returns:
138 Benchmark results
139 """
140 logger.info(f"Benchmarking search: {num_queries} queries, k={k}")
142 # Generate random query vectors
143 queries = self.rng.random((num_queries, vector_dim), dtype=np.float32)
145 # Measure search latencies
146 latencies = []
147 start_time = time.time()
149 for i in range(num_queries):
150 query_start = time.time()
151 await self.store.search(queries[i], k=k)
152 latencies.append(time.time() - query_start)
154 duration = time.time() - start_time
155 throughput = num_queries / duration if duration > 0 else 0
157 # Calculate percentiles
158 latencies.sort()
159 p50 = latencies[len(latencies) // 2]
160 p95 = latencies[int(len(latencies) * 0.95)]
161 p99 = latencies[int(len(latencies) * 0.99)]
163 result = BenchmarkResult(
164 operation="search",
165 num_vectors=await self.store.count(),
166 vector_dim=vector_dim,
167 duration=duration,
168 throughput=throughput,
169 latency_p50=p50,
170 latency_p95=p95,
171 latency_p99=p99,
172 metadata={"num_queries": num_queries, "k": k}
173 )
175 self.results.append(result)
176 return result
178 async def benchmark_update(
179 self,
180 num_updates: int = 1000,
181 vector_dim: int = 128
182 ) -> BenchmarkResult:
183 """Benchmark vector update performance.
185 Args:
186 num_updates: Number of vectors to update
187 vector_dim: Dimension of vectors
189 Returns:
190 Benchmark results
191 """
192 logger.info(f"Benchmarking updates: {num_updates} vectors")
194 # Get existing vector IDs
195 count = await self.store.count()
196 if count == 0:
197 # Add some vectors first
198 await self.benchmark_indexing(num_updates, vector_dim)
200 # Generate new vectors for updates
201 vectors = self.rng.random((num_updates, vector_dim), dtype=np.float32)
202 ids = [str(i) for i in range(num_updates)]
203 metadata = [{"updated": True, "index": i} for i in range(num_updates)]
205 # Measure update time (vectors and metadata)
206 start_time = time.time()
208 # Update vectors and metadata
209 await self.store.update_vectors(vectors, ids, metadata)
211 duration = time.time() - start_time
212 throughput = num_updates / duration if duration > 0 else 0
214 result = BenchmarkResult(
215 operation="update",
216 num_vectors=num_updates,
217 vector_dim=vector_dim,
218 duration=duration,
219 throughput=throughput
220 )
222 self.results.append(result)
223 return result
225 async def benchmark_delete(
226 self,
227 num_deletes: int = 1000
228 ) -> BenchmarkResult:
229 """Benchmark vector deletion performance.
231 Args:
232 num_deletes: Number of vectors to delete
234 Returns:
235 Benchmark results
236 """
237 logger.info(f"Benchmarking deletion: {num_deletes} vectors")
239 # Get vector count
240 initial_count = await self.store.count()
242 # Generate IDs to delete
243 ids = [str(i) for i in range(min(num_deletes, initial_count))]
245 # Measure deletion time
246 start_time = time.time()
247 deleted = await self.store.delete_vectors(ids)
248 duration = time.time() - start_time
250 throughput = deleted / duration if duration > 0 else 0
252 result = BenchmarkResult(
253 operation="delete",
254 num_vectors=deleted,
255 vector_dim=0,
256 duration=duration,
257 throughput=throughput
258 )
260 self.results.append(result)
261 return result
263 async def benchmark_concurrent_operations(
264 self,
265 num_workers: int = 10,
266 operations_per_worker: int = 100,
267 vector_dim: int = 128
268 ) -> BenchmarkResult:
269 """Benchmark concurrent operations.
271 Args:
272 num_workers: Number of concurrent workers
273 operations_per_worker: Operations per worker
274 vector_dim: Dimension of vectors
276 Returns:
277 Benchmark results
278 """
279 logger.info(f"Benchmarking concurrency: {num_workers} workers")
281 async def worker(worker_id: int) -> float:
282 """Worker function for concurrent operations."""
283 start = time.time()
285 for i in range(operations_per_worker):
286 # Mix of operations
287 if i % 4 == 0:
288 # Add vector
289 vector = self.rng.random(vector_dim, dtype=np.float32)
290 await self.store.add_vectors(
291 vector,
292 ids=[f"w{worker_id}_v{i}"],
293 metadata=[{"worker": worker_id}]
294 )
295 else:
296 # Search
297 query = self.rng.random(vector_dim, dtype=np.float32)
298 await self.store.search(query, k=5)
300 return time.time() - start
302 # Run workers concurrently
303 start_time = time.time()
304 tasks = [worker(i) for i in range(num_workers)]
305 worker_times = await asyncio.gather(*tasks)
306 duration = time.time() - start_time
308 total_ops = num_workers * operations_per_worker
309 throughput = total_ops / duration if duration > 0 else 0
311 # Calculate worker time statistics
312 avg_worker_time = sum(worker_times) / len(worker_times)
314 result = BenchmarkResult(
315 operation="concurrent",
316 num_vectors=total_ops,
317 vector_dim=vector_dim,
318 duration=duration,
319 throughput=throughput,
320 metadata={
321 "num_workers": num_workers,
322 "ops_per_worker": operations_per_worker,
323 "avg_worker_time": avg_worker_time
324 }
325 )
327 self.results.append(result)
328 return result
330 async def run_full_benchmark(
331 self,
332 vector_dims: list[int] | None = None,
333 num_vectors_list: list[int] | None = None
334 ) -> list[BenchmarkResult]:
335 """Run a complete benchmark suite.
337 Args:
338 vector_dims: List of vector dimensions to test
339 num_vectors_list: List of vector counts to test
341 Returns:
342 List of all benchmark results
343 """
344 if vector_dims is None:
345 vector_dims = [128, 256, 512]
346 if num_vectors_list is None:
347 num_vectors_list = [1000, 10000, 50000]
349 logger.info("Starting full benchmark suite")
351 for dim in vector_dims:
352 for num_vectors in num_vectors_list:
353 # Clear store
354 await self.store.clear()
356 # Run benchmarks
357 await self.benchmark_indexing(num_vectors, dim)
358 await self.benchmark_search(min(1000, num_vectors // 10), 10, dim)
359 await self.benchmark_update(min(1000, num_vectors // 10), dim)
360 await self.benchmark_delete(min(1000, num_vectors // 10))
362 # Test concurrency
363 await self.store.clear()
364 await self.benchmark_concurrent_operations()
366 return self.results
368 def generate_report(self) -> str:
369 """Generate a benchmark report.
371 Returns:
372 Formatted report string
373 """
374 if not self.results:
375 return "No benchmark results available"
377 lines = ["=" * 60, "Vector Store Benchmark Report", "=" * 60, ""]
379 # Group by operation
380 by_operation = {}
381 for result in self.results:
382 if result.operation not in by_operation:
383 by_operation[result.operation] = []
384 by_operation[result.operation].append(result)
386 for operation, results in by_operation.items():
387 lines.append(f"\n{operation.upper()} Operations:")
388 lines.append("-" * 40)
390 for result in results:
391 lines.append(str(result))
392 lines.append("")
394 return "\n".join(lines)
397class ComparativeBenchmark:
398 """Compare performance across different vector stores."""
400 def __init__(self, stores: dict[str, VectorStore]):
401 """Initialize with multiple stores to compare.
403 Args:
404 stores: Dictionary of store name to store instance
405 """
406 self.stores = stores
407 self.results: dict[str, list[BenchmarkResult]] = {}
409 async def compare_indexing(
410 self,
411 num_vectors: int = 10000,
412 vector_dim: int = 128
413 ) -> dict[str, BenchmarkResult]:
414 """Compare indexing performance across stores.
416 Args:
417 num_vectors: Number of vectors to index
418 vector_dim: Dimension of vectors
420 Returns:
421 Dictionary of store name to results
422 """
423 comparison = {}
425 for name, store in self.stores.items():
426 logger.info(f"Benchmarking {name}")
427 benchmark = VectorStoreBenchmark(store)
428 result = await benchmark.benchmark_indexing(num_vectors, vector_dim)
429 comparison[name] = result
431 if name not in self.results:
432 self.results[name] = []
433 self.results[name].append(result)
435 return comparison
437 async def compare_search(
438 self,
439 num_queries: int = 1000,
440 k: int = 10,
441 vector_dim: int = 128
442 ) -> dict[str, BenchmarkResult]:
443 """Compare search performance across stores.
445 Args:
446 num_queries: Number of queries
447 k: Results per query
448 vector_dim: Query vector dimension
450 Returns:
451 Dictionary of store name to results
452 """
453 comparison = {}
455 for name, store in self.stores.items():
456 logger.info(f"Benchmarking {name} search")
457 benchmark = VectorStoreBenchmark(store)
458 result = await benchmark.benchmark_search(num_queries, k, vector_dim)
459 comparison[name] = result
461 if name not in self.results:
462 self.results[name] = []
463 self.results[name].append(result)
465 return comparison
467 def generate_comparison_report(self) -> str:
468 """Generate a comparison report.
470 Returns:
471 Formatted comparison report
472 """
473 if not self.results:
474 return "No comparison results available"
476 lines = ["=" * 80, "Vector Store Comparison Report", "=" * 80, ""]
478 # Find all operations
479 all_operations = set()
480 for store_results in self.results.values():
481 for result in store_results:
482 all_operations.add(result.operation)
484 # Compare by operation
485 for operation in all_operations:
486 lines.append(f"\n{operation.upper()} Comparison:")
487 lines.append("-" * 60)
489 # Create comparison table
490 table_data = []
491 for store_name, store_results in self.results.items():
492 for result in store_results:
493 if result.operation == operation:
494 table_data.append([
495 store_name,
496 f"{result.throughput:.0f} vec/s",
497 f"{result.duration:.3f}s",
498 f"{result.latency_p50*1000:.1f}ms" if result.latency_p50 else "N/A"
499 ])
501 # Format table
502 if table_data:
503 headers = ["Store", "Throughput", "Duration", "P50 Latency"]
504 col_widths = [
505 max(len(h), max(len(row[i]) for row in table_data))
506 for i, h in enumerate(headers)
507 ]
509 # Header
510 header_line = " | ".join(
511 h.ljust(w) for h, w in zip(headers, col_widths, strict=False)
512 )
513 lines.append(header_line)
514 lines.append("-" * len(header_line))
516 # Data rows
517 for row in table_data:
518 row_line = " | ".join(
519 cell.ljust(w) for cell, w in zip(row, col_widths, strict=False)
520 )
521 lines.append(row_line)
523 lines.append("")
525 return "\n".join(lines)
528# Export main classes
529__all__ = [
530 "BenchmarkResult",
531 "ComparativeBenchmark",
532 "VectorStoreBenchmark",
533]