Coverage for src / dataknobs_data / vector / benchmarks.py: 0%

204 statements  

« prev     ^ index     » next       coverage.py v7.13.0, created at 2025-12-26 15:45 -0700

1"""Vector store performance benchmarks.""" 

2 

3from __future__ import annotations 

4 

5import asyncio 

6import logging 

7import time 

8from dataclasses import dataclass 

9from typing import Any, TYPE_CHECKING 

10 

11import numpy as np 

12 

13if TYPE_CHECKING: 

14 from .stores.base import VectorStore 

15 

16 

17logger = logging.getLogger(__name__) 

18 

19 

20@dataclass 

21class BenchmarkResult: 

22 """Results from a benchmark run.""" 

23 

24 operation: str 

25 num_vectors: int 

26 vector_dim: int 

27 duration: float 

28 throughput: float 

29 memory_used: int | None = None 

30 latency_p50: float | None = None 

31 latency_p95: float | None = None 

32 latency_p99: float | None = None 

33 metadata: dict[str, Any] | None = None 

34 

35 def __post_init__(self): 

36 if self.metadata is None: 

37 self.metadata = {} 

38 

39 def __str__(self) -> str: 

40 """String representation of results.""" 

41 lines = [ 

42 f"Operation: {self.operation}", 

43 f"Vectors: {self.num_vectors:,}", 

44 f"Dimensions: {self.vector_dim}", 

45 f"Duration: {self.duration:.3f}s", 

46 f"Throughput: {self.throughput:.0f} vectors/s" 

47 ] 

48 

49 if self.latency_p50 is not None: 

50 lines.append(f"Latency P50: {self.latency_p50*1000:.2f}ms") 

51 if self.latency_p95 is not None: 

52 lines.append(f"Latency P95: {self.latency_p95*1000:.2f}ms") 

53 if self.latency_p99 is not None: 

54 lines.append(f"Latency P99: {self.latency_p99*1000:.2f}ms") 

55 if self.memory_used is not None: 

56 lines.append(f"Memory: {self.memory_used / (1024*1024):.1f}MB") 

57 

58 return "\n".join(lines) 

59 

60 

61class VectorStoreBenchmark: 

62 """Benchmarks for vector store operations.""" 

63 

64 def __init__(self, store: VectorStore): 

65 """Initialize benchmark with a vector store. 

66  

67 Args: 

68 store: Vector store to benchmark 

69 """ 

70 self.store = store 

71 self.results: list[BenchmarkResult] = [] 

72 self.rng = np.random.default_rng() # Create RNG once for all benchmarks 

73 

74 async def benchmark_indexing( 

75 self, 

76 num_vectors: int = 10000, 

77 vector_dim: int = 128, 

78 batch_size: int = 100 

79 ) -> BenchmarkResult: 

80 """Benchmark vector indexing performance. 

81  

82 Args: 

83 num_vectors: Number of vectors to index 

84 vector_dim: Dimension of vectors 

85 batch_size: Batch size for indexing 

86  

87 Returns: 

88 Benchmark results 

89 """ 

90 logger.info(f"Benchmarking indexing: {num_vectors} vectors of dim {vector_dim}") 

91 

92 # Generate random vectors 

93 vectors = self.rng.random((num_vectors, vector_dim), dtype=np.float32) 

94 ids = [str(i) for i in range(num_vectors)] 

95 metadata = [{"index": i} for i in range(num_vectors)] 

96 

97 # Measure indexing time 

98 start_time = time.time() 

99 

100 # Index in batches 

101 for i in range(0, num_vectors, batch_size): 

102 batch_end = min(i + batch_size, num_vectors) 

103 await self.store.add_vectors( 

104 vectors[i:batch_end], 

105 ids=ids[i:batch_end], 

106 metadata=metadata[i:batch_end] 

107 ) 

108 

109 duration = time.time() - start_time 

110 throughput = num_vectors / duration if duration > 0 else 0 

111 

112 result = BenchmarkResult( 

113 operation="indexing", 

114 num_vectors=num_vectors, 

115 vector_dim=vector_dim, 

116 duration=duration, 

117 throughput=throughput, 

118 metadata={"batch_size": batch_size} 

119 ) 

120 

121 self.results.append(result) 

122 return result 

123 

124 async def benchmark_search( 

125 self, 

126 num_queries: int = 1000, 

127 k: int = 10, 

128 vector_dim: int = 128 

129 ) -> BenchmarkResult: 

130 """Benchmark vector search performance. 

131  

132 Args: 

133 num_queries: Number of search queries 

134 k: Number of results per query 

135 vector_dim: Dimension of query vectors 

136  

137 Returns: 

138 Benchmark results 

139 """ 

140 logger.info(f"Benchmarking search: {num_queries} queries, k={k}") 

141 

142 # Generate random query vectors 

143 queries = self.rng.random((num_queries, vector_dim), dtype=np.float32) 

144 

145 # Measure search latencies 

146 latencies = [] 

147 start_time = time.time() 

148 

149 for i in range(num_queries): 

150 query_start = time.time() 

151 await self.store.search(queries[i], k=k) 

152 latencies.append(time.time() - query_start) 

153 

154 duration = time.time() - start_time 

155 throughput = num_queries / duration if duration > 0 else 0 

156 

157 # Calculate percentiles 

158 latencies.sort() 

159 p50 = latencies[len(latencies) // 2] 

160 p95 = latencies[int(len(latencies) * 0.95)] 

161 p99 = latencies[int(len(latencies) * 0.99)] 

162 

163 result = BenchmarkResult( 

164 operation="search", 

165 num_vectors=await self.store.count(), 

166 vector_dim=vector_dim, 

167 duration=duration, 

168 throughput=throughput, 

169 latency_p50=p50, 

170 latency_p95=p95, 

171 latency_p99=p99, 

172 metadata={"num_queries": num_queries, "k": k} 

173 ) 

174 

175 self.results.append(result) 

176 return result 

177 

178 async def benchmark_update( 

179 self, 

180 num_updates: int = 1000, 

181 vector_dim: int = 128 

182 ) -> BenchmarkResult: 

183 """Benchmark vector update performance. 

184  

185 Args: 

186 num_updates: Number of vectors to update 

187 vector_dim: Dimension of vectors 

188  

189 Returns: 

190 Benchmark results 

191 """ 

192 logger.info(f"Benchmarking updates: {num_updates} vectors") 

193 

194 # Get existing vector IDs 

195 count = await self.store.count() 

196 if count == 0: 

197 # Add some vectors first 

198 await self.benchmark_indexing(num_updates, vector_dim) 

199 

200 # Generate new vectors for updates 

201 vectors = self.rng.random((num_updates, vector_dim), dtype=np.float32) 

202 ids = [str(i) for i in range(num_updates)] 

203 metadata = [{"updated": True, "index": i} for i in range(num_updates)] 

204 

205 # Measure update time (vectors and metadata) 

206 start_time = time.time() 

207 

208 # Update vectors and metadata 

209 await self.store.update_vectors(vectors, ids, metadata) 

210 

211 duration = time.time() - start_time 

212 throughput = num_updates / duration if duration > 0 else 0 

213 

214 result = BenchmarkResult( 

215 operation="update", 

216 num_vectors=num_updates, 

217 vector_dim=vector_dim, 

218 duration=duration, 

219 throughput=throughput 

220 ) 

221 

222 self.results.append(result) 

223 return result 

224 

225 async def benchmark_delete( 

226 self, 

227 num_deletes: int = 1000 

228 ) -> BenchmarkResult: 

229 """Benchmark vector deletion performance. 

230  

231 Args: 

232 num_deletes: Number of vectors to delete 

233  

234 Returns: 

235 Benchmark results 

236 """ 

237 logger.info(f"Benchmarking deletion: {num_deletes} vectors") 

238 

239 # Get vector count 

240 initial_count = await self.store.count() 

241 

242 # Generate IDs to delete 

243 ids = [str(i) for i in range(min(num_deletes, initial_count))] 

244 

245 # Measure deletion time 

246 start_time = time.time() 

247 deleted = await self.store.delete_vectors(ids) 

248 duration = time.time() - start_time 

249 

250 throughput = deleted / duration if duration > 0 else 0 

251 

252 result = BenchmarkResult( 

253 operation="delete", 

254 num_vectors=deleted, 

255 vector_dim=0, 

256 duration=duration, 

257 throughput=throughput 

258 ) 

259 

260 self.results.append(result) 

261 return result 

262 

263 async def benchmark_concurrent_operations( 

264 self, 

265 num_workers: int = 10, 

266 operations_per_worker: int = 100, 

267 vector_dim: int = 128 

268 ) -> BenchmarkResult: 

269 """Benchmark concurrent operations. 

270  

271 Args: 

272 num_workers: Number of concurrent workers 

273 operations_per_worker: Operations per worker 

274 vector_dim: Dimension of vectors 

275  

276 Returns: 

277 Benchmark results 

278 """ 

279 logger.info(f"Benchmarking concurrency: {num_workers} workers") 

280 

281 async def worker(worker_id: int) -> float: 

282 """Worker function for concurrent operations.""" 

283 start = time.time() 

284 

285 for i in range(operations_per_worker): 

286 # Mix of operations 

287 if i % 4 == 0: 

288 # Add vector 

289 vector = self.rng.random(vector_dim, dtype=np.float32) 

290 await self.store.add_vectors( 

291 vector, 

292 ids=[f"w{worker_id}_v{i}"], 

293 metadata=[{"worker": worker_id}] 

294 ) 

295 else: 

296 # Search 

297 query = self.rng.random(vector_dim, dtype=np.float32) 

298 await self.store.search(query, k=5) 

299 

300 return time.time() - start 

301 

302 # Run workers concurrently 

303 start_time = time.time() 

304 tasks = [worker(i) for i in range(num_workers)] 

305 worker_times = await asyncio.gather(*tasks) 

306 duration = time.time() - start_time 

307 

308 total_ops = num_workers * operations_per_worker 

309 throughput = total_ops / duration if duration > 0 else 0 

310 

311 # Calculate worker time statistics 

312 avg_worker_time = sum(worker_times) / len(worker_times) 

313 

314 result = BenchmarkResult( 

315 operation="concurrent", 

316 num_vectors=total_ops, 

317 vector_dim=vector_dim, 

318 duration=duration, 

319 throughput=throughput, 

320 metadata={ 

321 "num_workers": num_workers, 

322 "ops_per_worker": operations_per_worker, 

323 "avg_worker_time": avg_worker_time 

324 } 

325 ) 

326 

327 self.results.append(result) 

328 return result 

329 

330 async def run_full_benchmark( 

331 self, 

332 vector_dims: list[int] | None = None, 

333 num_vectors_list: list[int] | None = None 

334 ) -> list[BenchmarkResult]: 

335 """Run a complete benchmark suite. 

336  

337 Args: 

338 vector_dims: List of vector dimensions to test 

339 num_vectors_list: List of vector counts to test 

340  

341 Returns: 

342 List of all benchmark results 

343 """ 

344 if vector_dims is None: 

345 vector_dims = [128, 256, 512] 

346 if num_vectors_list is None: 

347 num_vectors_list = [1000, 10000, 50000] 

348 

349 logger.info("Starting full benchmark suite") 

350 

351 for dim in vector_dims: 

352 for num_vectors in num_vectors_list: 

353 # Clear store 

354 await self.store.clear() 

355 

356 # Run benchmarks 

357 await self.benchmark_indexing(num_vectors, dim) 

358 await self.benchmark_search(min(1000, num_vectors // 10), 10, dim) 

359 await self.benchmark_update(min(1000, num_vectors // 10), dim) 

360 await self.benchmark_delete(min(1000, num_vectors // 10)) 

361 

362 # Test concurrency 

363 await self.store.clear() 

364 await self.benchmark_concurrent_operations() 

365 

366 return self.results 

367 

368 def generate_report(self) -> str: 

369 """Generate a benchmark report. 

370  

371 Returns: 

372 Formatted report string 

373 """ 

374 if not self.results: 

375 return "No benchmark results available" 

376 

377 lines = ["=" * 60, "Vector Store Benchmark Report", "=" * 60, ""] 

378 

379 # Group by operation 

380 by_operation = {} 

381 for result in self.results: 

382 if result.operation not in by_operation: 

383 by_operation[result.operation] = [] 

384 by_operation[result.operation].append(result) 

385 

386 for operation, results in by_operation.items(): 

387 lines.append(f"\n{operation.upper()} Operations:") 

388 lines.append("-" * 40) 

389 

390 for result in results: 

391 lines.append(str(result)) 

392 lines.append("") 

393 

394 return "\n".join(lines) 

395 

396 

397class ComparativeBenchmark: 

398 """Compare performance across different vector stores.""" 

399 

400 def __init__(self, stores: dict[str, VectorStore]): 

401 """Initialize with multiple stores to compare. 

402  

403 Args: 

404 stores: Dictionary of store name to store instance 

405 """ 

406 self.stores = stores 

407 self.results: dict[str, list[BenchmarkResult]] = {} 

408 

409 async def compare_indexing( 

410 self, 

411 num_vectors: int = 10000, 

412 vector_dim: int = 128 

413 ) -> dict[str, BenchmarkResult]: 

414 """Compare indexing performance across stores. 

415  

416 Args: 

417 num_vectors: Number of vectors to index 

418 vector_dim: Dimension of vectors 

419  

420 Returns: 

421 Dictionary of store name to results 

422 """ 

423 comparison = {} 

424 

425 for name, store in self.stores.items(): 

426 logger.info(f"Benchmarking {name}") 

427 benchmark = VectorStoreBenchmark(store) 

428 result = await benchmark.benchmark_indexing(num_vectors, vector_dim) 

429 comparison[name] = result 

430 

431 if name not in self.results: 

432 self.results[name] = [] 

433 self.results[name].append(result) 

434 

435 return comparison 

436 

437 async def compare_search( 

438 self, 

439 num_queries: int = 1000, 

440 k: int = 10, 

441 vector_dim: int = 128 

442 ) -> dict[str, BenchmarkResult]: 

443 """Compare search performance across stores. 

444  

445 Args: 

446 num_queries: Number of queries 

447 k: Results per query 

448 vector_dim: Query vector dimension 

449  

450 Returns: 

451 Dictionary of store name to results 

452 """ 

453 comparison = {} 

454 

455 for name, store in self.stores.items(): 

456 logger.info(f"Benchmarking {name} search") 

457 benchmark = VectorStoreBenchmark(store) 

458 result = await benchmark.benchmark_search(num_queries, k, vector_dim) 

459 comparison[name] = result 

460 

461 if name not in self.results: 

462 self.results[name] = [] 

463 self.results[name].append(result) 

464 

465 return comparison 

466 

467 def generate_comparison_report(self) -> str: 

468 """Generate a comparison report. 

469  

470 Returns: 

471 Formatted comparison report 

472 """ 

473 if not self.results: 

474 return "No comparison results available" 

475 

476 lines = ["=" * 80, "Vector Store Comparison Report", "=" * 80, ""] 

477 

478 # Find all operations 

479 all_operations = set() 

480 for store_results in self.results.values(): 

481 for result in store_results: 

482 all_operations.add(result.operation) 

483 

484 # Compare by operation 

485 for operation in all_operations: 

486 lines.append(f"\n{operation.upper()} Comparison:") 

487 lines.append("-" * 60) 

488 

489 # Create comparison table 

490 table_data = [] 

491 for store_name, store_results in self.results.items(): 

492 for result in store_results: 

493 if result.operation == operation: 

494 table_data.append([ 

495 store_name, 

496 f"{result.throughput:.0f} vec/s", 

497 f"{result.duration:.3f}s", 

498 f"{result.latency_p50*1000:.1f}ms" if result.latency_p50 else "N/A" 

499 ]) 

500 

501 # Format table 

502 if table_data: 

503 headers = ["Store", "Throughput", "Duration", "P50 Latency"] 

504 col_widths = [ 

505 max(len(h), max(len(row[i]) for row in table_data)) 

506 for i, h in enumerate(headers) 

507 ] 

508 

509 # Header 

510 header_line = " | ".join( 

511 h.ljust(w) for h, w in zip(headers, col_widths, strict=False) 

512 ) 

513 lines.append(header_line) 

514 lines.append("-" * len(header_line)) 

515 

516 # Data rows 

517 for row in table_data: 

518 row_line = " | ".join( 

519 cell.ljust(w) for cell, w in zip(row, col_widths, strict=False) 

520 ) 

521 lines.append(row_line) 

522 

523 lines.append("") 

524 

525 return "\n".join(lines) 

526 

527 

528# Export main classes 

529__all__ = [ 

530 "BenchmarkResult", 

531 "ComparativeBenchmark", 

532 "VectorStoreBenchmark", 

533]