Coverage for src / dataknobs_data / backends / duckdb.py: 15%

468 statements  

« prev     ^ index     » next       coverage.py v7.13.0, created at 2025-12-26 15:45 -0700

1"""DuckDB backend implementation for analytical workloads. 

2 

3DuckDB is an embedded columnar database optimized for analytics, 

4providing 10-100x performance improvement over SQLite for 

5aggregations, joins, and analytical queries. 

6""" 

7 

8from __future__ import annotations 

9 

10import asyncio 

11import logging 

12import threading 

13from concurrent.futures import ThreadPoolExecutor 

14from pathlib import Path 

15from typing import Any, TYPE_CHECKING 

16 

17import duckdb 

18from dataknobs_config import ConfigurableBase 

19 

20from ..database import AsyncDatabase, SyncDatabase 

21from ..query import Query 

22from ..query_logic import ComplexQuery 

23from .sql_base import SQLQueryBuilder, SQLRecordSerializer, SQLTableManager 

24 

25if TYPE_CHECKING: 

26 from collections.abc import AsyncIterator, Iterator 

27 from ..records import Record 

28 from ..streaming import StreamConfig, StreamResult 

29 

30 

31logger = logging.getLogger(__name__) 

32 

33 

34class AsyncDuckDBDatabase(AsyncDatabase, ConfigurableBase): # type: ignore[misc] 

35 """Asynchronous DuckDB database backend for analytical workloads. 

36 

37 DuckDB is an embedded columnar database optimized for analytics. 

38 Provides 10-100x performance improvement over SQLite for 

39 aggregations, joins, and analytical queries. 

40 

41 Features: 

42 - Columnar storage for fast analytical queries 

43 - Parallel execution for multi-threaded query processing 

44 - Native Parquet integration for efficient data import/export 

45 - Advanced analytics support (window functions, CTEs, complex aggregations) 

46 

47 Usage: 

48 ```python 

49 from dataknobs_data import async_database_factory 

50 

51 # File-based database 

52 db = async_database_factory("duckdb:///path/to/data.duckdb") 

53 

54 # In-memory database 

55 db = async_database_factory("duckdb:///:memory:") 

56 

57 async with db: 

58 # Perform CRUD operations 

59 await db.create(record) 

60 results = await db.search(query) 

61 ``` 

62 """ 

63 

64 def __init__(self, config: dict[str, Any] | None = None): 

65 """Initialize async DuckDB database. 

66 

67 Args: 

68 config: Configuration with the following optional keys: 

69 - path: Database file path (default: ":memory:") 

70 - table: Table name (default: "records") 

71 - timeout: Connection timeout in seconds (default: 5.0) 

72 - max_workers: Number of threads in pool (default: 4) 

73 - read_only: Open database in read-only mode (default: False) 

74 """ 

75 super().__init__(config) 

76 config = config or {} 

77 self.db_path = config.get("path", ":memory:") 

78 self.table_name = config.get("table", "records") 

79 self.timeout = config.get("timeout", 5.0) 

80 self.max_workers = config.get("max_workers", 4) 

81 self.read_only = config.get("read_only", False) 

82 

83 # Thread pool for async operations (DuckDB has no native async support) 

84 self.executor = ThreadPoolExecutor(max_workers=self.max_workers) 

85 

86 # Reuse SQL infrastructure 

87 self.query_builder = SQLQueryBuilder( 

88 self.table_name, 

89 dialect="duckdb", 

90 param_style="qmark" # DuckDB uses ? placeholders 

91 ) 

92 self.serializer = SQLRecordSerializer() 

93 self.table_manager = SQLTableManager( 

94 self.table_name, 

95 dialect="duckdb" 

96 ) 

97 

98 self.conn: duckdb.DuckDBPyConnection | None = None 

99 self._connected = False 

100 self._lock = threading.Lock() # Thread safety lock for DuckDB connection 

101 

102 @classmethod 

103 def from_config(cls, config: dict) -> AsyncDuckDBDatabase: 

104 """Create from config dictionary.""" 

105 return cls(config) 

106 

107 async def connect(self) -> None: 

108 """Connect to the DuckDB database.""" 

109 if self._connected: 

110 return 

111 

112 # Create directory if needed for file-based database 

113 if self.db_path != ":memory:": 

114 db_file = Path(self.db_path) 

115 db_file.parent.mkdir(parents=True, exist_ok=True) 

116 

117 # Connect to database (in thread pool since DuckDB is sync) 

118 loop = asyncio.get_event_loop() 

119 self.conn = await loop.run_in_executor( 

120 self.executor, 

121 self._connect_sync 

122 ) 

123 

124 # Create table if it doesn't exist 

125 await self._ensure_table() 

126 

127 self._connected = True 

128 logger.info(f"Connected to async DuckDB database: {self.db_path}") 

129 

130 def _connect_sync(self) -> duckdb.DuckDBPyConnection: 

131 """Synchronous connection helper.""" 

132 return duckdb.connect( 

133 self.db_path, 

134 read_only=self.read_only 

135 ) 

136 

137 async def close(self) -> None: 

138 """Close the database connection.""" 

139 if self.conn: 

140 loop = asyncio.get_event_loop() 

141 await loop.run_in_executor( 

142 self.executor, 

143 self.conn.close 

144 ) 

145 self.conn = None 

146 self._connected = False 

147 logger.info(f"Disconnected from async DuckDB database: {self.db_path}") 

148 

149 # Shutdown executor 

150 self.executor.shutdown(wait=True) 

151 

152 async def _ensure_table(self) -> None: 

153 """Ensure the table exists.""" 

154 if not self.conn: 

155 raise RuntimeError("Database not connected. Call connect() first.") 

156 

157 loop = asyncio.get_event_loop() 

158 await loop.run_in_executor( 

159 self.executor, 

160 self._ensure_table_sync 

161 ) 

162 

163 def _ensure_table_sync(self) -> None: 

164 """Synchronous table creation.""" 

165 # Skip table creation in read-only mode 

166 if self.read_only: 

167 return 

168 

169 with self._lock: 

170 create_sql = self.table_manager.get_create_table_sql() 

171 self.conn.execute(create_sql) 

172 

173 def _check_connection(self) -> None: 

174 """Check if database is connected.""" 

175 if not self._connected or not self.conn: 

176 raise RuntimeError("Database not connected. Call connect() first.") 

177 

178 async def create(self, record: Record) -> str: 

179 """Create a new record. 

180 

181 Args: 

182 record: The record to create 

183 

184 Returns: 

185 The record ID 

186 """ 

187 self._check_connection() 

188 

189 loop = asyncio.get_event_loop() 

190 return await loop.run_in_executor( 

191 self.executor, 

192 self._create_sync, 

193 record 

194 ) 

195 

196 def _create_sync(self, record: Record) -> str: 

197 """Synchronous create implementation.""" 

198 query, params = self.query_builder.build_create_query(record) 

199 

200 try: 

201 with self._lock: 

202 self.conn.execute(query, params) 

203 # DuckDB doesn't support RETURNING, so we use the ID we generated 

204 record_id = params[0] # ID is the first parameter 

205 return record_id 

206 except duckdb.ConstraintException as e: 

207 raise ValueError(f"Record with ID {params[0]} already exists") from e 

208 

209 async def read(self, id: str) -> Record | None: 

210 """Read a record by ID. 

211 

212 Args: 

213 id: The record ID 

214 

215 Returns: 

216 The record if found, None otherwise 

217 """ 

218 self._check_connection() 

219 

220 loop = asyncio.get_event_loop() 

221 return await loop.run_in_executor( 

222 self.executor, 

223 self._read_sync, 

224 id 

225 ) 

226 

227 def _read_sync(self, id: str) -> Record | None: 

228 """Synchronous read implementation.""" 

229 query, params = self.query_builder.build_read_query(id) 

230 

231 with self._lock: 

232 result = self.conn.execute(query, params).fetchone() 

233 

234 if result: 

235 # Convert tuple result to dict 

236 columns = self.conn.description 

237 row_dict = {columns[i][0]: result[i] for i in range(len(columns))} 

238 return SQLQueryBuilder.row_to_record(row_dict) 

239 return None 

240 

241 async def update(self, id: str, record: Record) -> bool: 

242 """Update an existing record. 

243 

244 Args: 

245 id: The record ID to update 

246 record: The record data to update with 

247 

248 Returns: 

249 True if the record was updated, False if no record exists 

250 """ 

251 self._check_connection() 

252 

253 loop = asyncio.get_event_loop() 

254 return await loop.run_in_executor( 

255 self.executor, 

256 self._update_sync, 

257 id, 

258 record 

259 ) 

260 

261 def _update_sync(self, id: str, record: Record) -> bool: 

262 """Synchronous update implementation.""" 

263 query, params = self.query_builder.build_update_query(id, record) 

264 

265 with self._lock: 

266 # Check if record exists 

267 exists_query, exists_params = self.query_builder.build_exists_query(id) 

268 exists = self.conn.execute(exists_query, exists_params).fetchone() is not None 

269 

270 if exists: 

271 self.conn.execute(query, params) 

272 return True 

273 

274 logger.warning(f"Update affected 0 rows for id={id}. Record may not exist.") 

275 return False 

276 

277 async def delete(self, id: str) -> bool: 

278 """Delete a record by ID. 

279 

280 Args: 

281 id: The record ID 

282 

283 Returns: 

284 True if deleted, False if not found 

285 """ 

286 self._check_connection() 

287 

288 loop = asyncio.get_event_loop() 

289 return await loop.run_in_executor( 

290 self.executor, 

291 self._delete_sync, 

292 id 

293 ) 

294 

295 def _delete_sync(self, id: str) -> bool: 

296 """Synchronous delete implementation.""" 

297 query, params = self.query_builder.build_delete_query(id) 

298 

299 with self._lock: 

300 # First check if the record exists 

301 exists_query, exists_params = self.query_builder.build_exists_query(id) 

302 exists = self.conn.execute(exists_query, exists_params).fetchone() is not None 

303 

304 if exists: 

305 self.conn.execute(query, params) 

306 return True 

307 return False 

308 

309 async def exists(self, id: str) -> bool: 

310 """Check if a record exists. 

311 

312 Args: 

313 id: The record ID 

314 

315 Returns: 

316 True if exists, False otherwise 

317 """ 

318 self._check_connection() 

319 

320 loop = asyncio.get_event_loop() 

321 return await loop.run_in_executor( 

322 self.executor, 

323 self._exists_sync, 

324 id 

325 ) 

326 

327 def _exists_sync(self, id: str) -> bool: 

328 """Synchronous exists implementation.""" 

329 query, params = self.query_builder.build_exists_query(id) 

330 

331 with self._lock: 

332 result = self.conn.execute(query, params).fetchone() 

333 return result is not None 

334 

335 async def search(self, query: Query | ComplexQuery) -> list[Record]: 

336 """Search for records matching a query. 

337 

338 Args: 

339 query: The query specification 

340 

341 Returns: 

342 List of matching records 

343 """ 

344 self._check_connection() 

345 

346 loop = asyncio.get_event_loop() 

347 return await loop.run_in_executor( 

348 self.executor, 

349 self._search_sync, 

350 query 

351 ) 

352 

353 def _search_sync(self, query: Query | ComplexQuery) -> list[Record]: 

354 """Synchronous search implementation.""" 

355 # Handle ComplexQuery with native SQL support 

356 if isinstance(query, ComplexQuery): 

357 sql_query, params = self.query_builder.build_complex_search_query(query) 

358 else: 

359 sql_query, params = self.query_builder.build_search_query(query) 

360 

361 with self._lock: 

362 results = self.conn.execute(sql_query, params).fetchall() 

363 columns = self.conn.description 

364 

365 records = [] 

366 for result in results: 

367 # Convert tuple result to dict 

368 row_dict = {columns[i][0]: result[i] for i in range(len(columns))} 

369 record = SQLQueryBuilder.row_to_record(row_dict) 

370 

371 # Populate storage_id from database ID 

372 record.storage_id = str(row_dict['id']) 

373 

374 records.append(record) 

375 

376 # Apply field projection if specified 

377 if hasattr(query, 'fields') and query.fields: 

378 records = [r.project(query.fields) for r in records] 

379 

380 return records 

381 

382 async def count(self, query: Query | None = None) -> int: 

383 """Count records matching a query. 

384 

385 Args: 

386 query: Optional query specification 

387 

388 Returns: 

389 Count of matching records 

390 """ 

391 self._check_connection() 

392 

393 loop = asyncio.get_event_loop() 

394 return await loop.run_in_executor( 

395 self.executor, 

396 self._count_sync, 

397 query 

398 ) 

399 

400 def _count_sync(self, query: Query | None = None) -> int: 

401 """Synchronous count implementation.""" 

402 sql_query, params = self.query_builder.build_count_query(query) 

403 

404 with self._lock: 

405 result = self.conn.execute(sql_query, params).fetchone() 

406 return result[0] if result else 0 

407 

408 async def create_batch(self, records: list[Record]) -> list[str]: 

409 """Create multiple records efficiently. 

410 

411 Args: 

412 records: List of records to create 

413 

414 Returns: 

415 List of record IDs 

416 """ 

417 if not records: 

418 return [] 

419 

420 self._check_connection() 

421 

422 loop = asyncio.get_event_loop() 

423 return await loop.run_in_executor( 

424 self.executor, 

425 self._create_batch_sync, 

426 records 

427 ) 

428 

429 def _create_batch_sync(self, records: list[Record]) -> list[str]: 

430 """Synchronous batch create implementation.""" 

431 # Use the shared batch create query builder 

432 query, params, ids = self.query_builder.build_batch_create_query(records) 

433 

434 # Execute the batch insert in a transaction 

435 with self._lock: 

436 try: 

437 self.conn.begin() 

438 self.conn.execute(query, params) 

439 self.conn.commit() 

440 return ids 

441 except Exception: 

442 self.conn.rollback() 

443 raise 

444 

445 async def update_batch(self, updates: list[tuple[str, Record]]) -> list[bool]: 

446 """Update multiple records efficiently. 

447 

448 Args: 

449 updates: List of (record_id, record) tuples 

450 

451 Returns: 

452 List of success indicators 

453 """ 

454 if not updates: 

455 return [] 

456 

457 self._check_connection() 

458 

459 loop = asyncio.get_event_loop() 

460 return await loop.run_in_executor( 

461 self.executor, 

462 self._update_batch_sync, 

463 updates 

464 ) 

465 

466 def _update_batch_sync(self, updates: list[tuple[str, Record]]) -> list[bool]: 

467 """Synchronous batch update implementation.""" 

468 # Use the shared batch update query builder 

469 query, params = self.query_builder.build_batch_update_query(updates) 

470 

471 # Execute the batch update in a transaction 

472 with self._lock: 

473 try: 

474 self.conn.begin() 

475 self.conn.execute(query, params) 

476 self.conn.commit() 

477 

478 # Check which records were actually updated 

479 update_ids = [record_id for record_id, _ in updates] 

480 placeholders = ", ".join(["?" for _ in update_ids]) 

481 check_query = f"SELECT id FROM {self.table_name} WHERE id IN ({placeholders})" 

482 

483 rows = self.conn.execute(check_query, update_ids).fetchall() 

484 existing_ids = {row[0] for row in rows} 

485 

486 # Return results for each update 

487 results = [] 

488 for record_id, _ in updates: 

489 results.append(record_id in existing_ids) 

490 

491 return results 

492 except Exception: 

493 self.conn.rollback() 

494 raise 

495 

496 async def delete_batch(self, ids: list[str]) -> list[bool]: 

497 """Delete multiple records efficiently. 

498 

499 Args: 

500 ids: List of record IDs to delete 

501 

502 Returns: 

503 List of success indicators 

504 """ 

505 if not ids: 

506 return [] 

507 

508 self._check_connection() 

509 

510 loop = asyncio.get_event_loop() 

511 return await loop.run_in_executor( 

512 self.executor, 

513 self._delete_batch_sync, 

514 ids 

515 ) 

516 

517 def _delete_batch_sync(self, ids: list[str]) -> list[bool]: 

518 """Synchronous batch delete implementation.""" 

519 with self._lock: 

520 # Check which IDs exist before deletion 

521 placeholders = ", ".join(["?" for _ in ids]) 

522 check_query = f"SELECT id FROM {self.table_name} WHERE id IN ({placeholders})" 

523 

524 rows = self.conn.execute(check_query, ids).fetchall() 

525 existing_ids = {row[0] for row in rows} 

526 

527 # Use the shared batch delete query builder 

528 query, params = self.query_builder.build_batch_delete_query(ids) 

529 

530 # Execute the batch delete in a transaction 

531 try: 

532 self.conn.begin() 

533 self.conn.execute(query, params) 

534 self.conn.commit() 

535 

536 # Return results based on which IDs existed 

537 results = [] 

538 for id in ids: 

539 results.append(id in existing_ids) 

540 

541 return results 

542 except Exception: 

543 self.conn.rollback() 

544 raise 

545 

546 def _initialize(self) -> None: 

547 """Initialize method - connection setup handled in connect().""" 

548 pass 

549 

550 async def _count_all(self) -> int: 

551 """Count all records in the database.""" 

552 self._check_connection() 

553 

554 loop = asyncio.get_event_loop() 

555 return await loop.run_in_executor( 

556 self.executor, 

557 self._count_all_sync 

558 ) 

559 

560 def _count_all_sync(self) -> int: 

561 """Synchronous count all implementation.""" 

562 with self._lock: 

563 result = self.conn.execute(f"SELECT COUNT(*) FROM {self.table_name}").fetchone() 

564 return result[0] if result else 0 

565 

566 async def stream_read( 

567 self, 

568 query: Query | None = None, 

569 config: StreamConfig | None = None 

570 ) -> AsyncIterator[Record]: 

571 """Stream records from database. 

572 

573 Args: 

574 query: Optional query specification 

575 config: Stream configuration 

576 

577 Yields: 

578 Records one at a time 

579 """ 

580 from ..streaming import StreamConfig 

581 

582 config = config or StreamConfig() 

583 query = query or Query() 

584 

585 # Use the existing stream method's logic but yield individual records 

586 offset = 0 

587 while True: 

588 # Fetch a batch 

589 query_copy = query.copy() 

590 query_copy.offset(offset).limit(config.batch_size) 

591 batch = await self.search(query_copy) 

592 

593 if not batch: 

594 break 

595 

596 for record in batch: 

597 yield record 

598 

599 offset += len(batch) 

600 

601 # If we got less than batch_size, we're done 

602 if len(batch) < config.batch_size: 

603 break 

604 

605 async def stream_write( 

606 self, 

607 records: AsyncIterator[Record], 

608 config: StreamConfig | None = None 

609 ) -> StreamResult: 

610 """Stream records into database. 

611 

612 Args: 

613 records: Async iterator of records 

614 config: Stream configuration 

615 

616 Returns: 

617 Stream result with statistics 

618 """ 

619 import time 

620 

621 from ..streaming import StreamConfig, StreamResult 

622 

623 config = config or StreamConfig() 

624 batch = [] 

625 total_written = 0 

626 start_time = time.time() 

627 

628 async for record in records: 

629 batch.append(record) 

630 

631 if len(batch) >= config.batch_size: 

632 # Write the batch 

633 await self.create_batch(batch) 

634 total_written += len(batch) 

635 batch = [] 

636 

637 # Write any remaining records 

638 if batch: 

639 await self.create_batch(batch) 

640 total_written += len(batch) 

641 

642 elapsed = time.time() - start_time 

643 

644 return StreamResult( 

645 total_processed=total_written, 

646 successful=total_written, 

647 failed=0, 

648 duration=elapsed, 

649 total_batches=(total_written + config.batch_size - 1) // config.batch_size 

650 ) 

651 

652 

653class SyncDuckDBDatabase(SyncDatabase, ConfigurableBase): # type: ignore[misc] 

654 """Synchronous DuckDB database backend for analytical workloads. 

655 

656 DuckDB is an embedded columnar database optimized for analytics. 

657 Provides 10-100x performance improvement over SQLite for 

658 aggregations, joins, and analytical queries. 

659 

660 Features: 

661 - Columnar storage for fast analytical queries 

662 - Native Parquet integration for efficient data import/export 

663 - Advanced analytics support (window functions, CTEs, complex aggregations) 

664 

665 Usage: 

666 ```python 

667 from dataknobs_data.backends.duckdb import SyncDuckDBDatabase 

668 

669 # File-based database 

670 db = SyncDuckDBDatabase({"path": "/path/to/data.duckdb"}) 

671 

672 # In-memory database 

673 db = SyncDuckDBDatabase({"path": ":memory:"}) 

674 

675 with db: 

676 # Perform CRUD operations 

677 db.create(record) 

678 results = db.search(query) 

679 ``` 

680 """ 

681 

682 def __init__(self, config: dict[str, Any] | None = None): 

683 """Initialize sync DuckDB database. 

684 

685 Args: 

686 config: Configuration with the following optional keys: 

687 - path: Database file path (default: ":memory:") 

688 - table: Table name (default: "records") 

689 - timeout: Connection timeout in seconds (default: 5.0) 

690 - read_only: Open database in read-only mode (default: False) 

691 """ 

692 super().__init__(config) 

693 config = config or {} 

694 self.db_path = config.get("path", ":memory:") 

695 self.table_name = config.get("table", "records") 

696 self.timeout = config.get("timeout", 5.0) 

697 self.read_only = config.get("read_only", False) 

698 

699 # Reuse SQL infrastructure 

700 self.query_builder = SQLQueryBuilder( 

701 self.table_name, 

702 dialect="duckdb", 

703 param_style="qmark" 

704 ) 

705 self.serializer = SQLRecordSerializer() 

706 self.table_manager = SQLTableManager( 

707 self.table_name, 

708 dialect="duckdb" 

709 ) 

710 

711 self.conn: duckdb.DuckDBPyConnection | None = None 

712 self._connected = False 

713 

714 @classmethod 

715 def from_config(cls, config: dict) -> SyncDuckDBDatabase: 

716 """Create from config dictionary.""" 

717 return cls(config) 

718 

719 def connect(self) -> None: 

720 """Connect to the DuckDB database.""" 

721 if self._connected: 

722 return 

723 

724 # Create directory if needed for file-based database 

725 if self.db_path != ":memory:": 

726 db_file = Path(self.db_path) 

727 db_file.parent.mkdir(parents=True, exist_ok=True) 

728 

729 # Connect to database 

730 self.conn = duckdb.connect( 

731 self.db_path, 

732 read_only=self.read_only 

733 ) 

734 

735 # Create table if it doesn't exist 

736 self._ensure_table() 

737 

738 self._connected = True 

739 logger.info(f"Connected to sync DuckDB database: {self.db_path}") 

740 

741 def close(self) -> None: 

742 """Close the database connection.""" 

743 if self.conn: 

744 self.conn.close() 

745 self.conn = None 

746 self._connected = False 

747 logger.info(f"Disconnected from sync DuckDB database: {self.db_path}") 

748 

749 def _ensure_table(self) -> None: 

750 """Ensure the table exists.""" 

751 if not self.conn: 

752 raise RuntimeError("Database not connected. Call connect() first.") 

753 

754 # Skip table creation in read-only mode 

755 if self.read_only: 

756 return 

757 

758 create_sql = self.table_manager.get_create_table_sql() 

759 self.conn.execute(create_sql) 

760 

761 def _check_connection(self) -> None: 

762 """Check if database is connected.""" 

763 if not self._connected or not self.conn: 

764 raise RuntimeError("Database not connected. Call connect() first.") 

765 

766 def create(self, record: Record) -> str: 

767 """Create a new record. 

768 

769 Args: 

770 record: The record to create 

771 

772 Returns: 

773 The record ID 

774 """ 

775 self._check_connection() 

776 query, params = self.query_builder.build_create_query(record) 

777 

778 try: 

779 self.conn.execute(query, params) 

780 record_id = params[0] # ID is the first parameter 

781 return record_id 

782 except duckdb.ConstraintException as e: 

783 raise ValueError(f"Record with ID {params[0]} already exists") from e 

784 

785 def read(self, id: str) -> Record | None: 

786 """Read a record by ID. 

787 

788 Args: 

789 id: The record ID 

790 

791 Returns: 

792 The record if found, None otherwise 

793 """ 

794 self._check_connection() 

795 query, params = self.query_builder.build_read_query(id) 

796 

797 result = self.conn.execute(query, params).fetchone() 

798 

799 if result: 

800 columns = self.conn.description 

801 row_dict = {columns[i][0]: result[i] for i in range(len(columns))} 

802 return SQLQueryBuilder.row_to_record(row_dict) 

803 return None 

804 

805 def update(self, id: str, record: Record) -> bool: 

806 """Update an existing record. 

807 

808 Args: 

809 id: The record ID to update 

810 record: The record data to update with 

811 

812 Returns: 

813 True if the record was updated, False if no record exists 

814 """ 

815 self._check_connection() 

816 query, params = self.query_builder.build_update_query(id, record) 

817 

818 # Check if record exists 

819 exists_query, exists_params = self.query_builder.build_exists_query(id) 

820 exists = self.conn.execute(exists_query, exists_params).fetchone() is not None 

821 

822 if exists: 

823 self.conn.execute(query, params) 

824 return True 

825 

826 logger.warning(f"Update affected 0 rows for id={id}. Record may not exist.") 

827 return False 

828 

829 def delete(self, id: str) -> bool: 

830 """Delete a record by ID. 

831 

832 Args: 

833 id: The record ID 

834 

835 Returns: 

836 True if deleted, False if not found 

837 """ 

838 self._check_connection() 

839 query, params = self.query_builder.build_delete_query(id) 

840 

841 # First check if the record exists 

842 exists_query, exists_params = self.query_builder.build_exists_query(id) 

843 exists = self.conn.execute(exists_query, exists_params).fetchone() is not None 

844 

845 if exists: 

846 self.conn.execute(query, params) 

847 return True 

848 return False 

849 

850 def exists(self, id: str) -> bool: 

851 """Check if a record exists. 

852 

853 Args: 

854 id: The record ID 

855 

856 Returns: 

857 True if exists, False otherwise 

858 """ 

859 self._check_connection() 

860 query, params = self.query_builder.build_exists_query(id) 

861 

862 result = self.conn.execute(query, params).fetchone() 

863 return result is not None 

864 

865 def search(self, query: Query | ComplexQuery) -> list[Record]: 

866 """Search for records matching a query. 

867 

868 Args: 

869 query: The query specification 

870 

871 Returns: 

872 List of matching records 

873 """ 

874 self._check_connection() 

875 

876 # Handle ComplexQuery with native SQL support 

877 if isinstance(query, ComplexQuery): 

878 sql_query, params = self.query_builder.build_complex_search_query(query) 

879 else: 

880 sql_query, params = self.query_builder.build_search_query(query) 

881 

882 results = self.conn.execute(sql_query, params).fetchall() 

883 columns = self.conn.description 

884 

885 records = [] 

886 for result in results: 

887 row_dict = {columns[i][0]: result[i] for i in range(len(columns))} 

888 record = SQLQueryBuilder.row_to_record(row_dict) 

889 record.storage_id = str(row_dict['id']) 

890 records.append(record) 

891 

892 # Apply field projection if specified 

893 if hasattr(query, 'fields') and query.fields: 

894 records = [r.project(query.fields) for r in records] 

895 

896 return records 

897 

898 def count(self, query: Query | None = None) -> int: 

899 """Count records matching a query. 

900 

901 Args: 

902 query: Optional query specification 

903 

904 Returns: 

905 Count of matching records 

906 """ 

907 self._check_connection() 

908 sql_query, params = self.query_builder.build_count_query(query) 

909 

910 result = self.conn.execute(sql_query, params).fetchone() 

911 return result[0] if result else 0 

912 

913 def create_batch(self, records: list[Record]) -> list[str]: 

914 """Create multiple records efficiently. 

915 

916 Args: 

917 records: List of records to create 

918 

919 Returns: 

920 List of record IDs 

921 """ 

922 if not records: 

923 return [] 

924 

925 self._check_connection() 

926 query, params, ids = self.query_builder.build_batch_create_query(records) 

927 

928 try: 

929 self.conn.begin() 

930 self.conn.execute(query, params) 

931 self.conn.commit() 

932 return ids 

933 except Exception: 

934 self.conn.rollback() 

935 raise 

936 

937 def update_batch(self, updates: list[tuple[str, Record]]) -> list[bool]: 

938 """Update multiple records efficiently. 

939 

940 Args: 

941 updates: List of (record_id, record) tuples 

942 

943 Returns: 

944 List of success indicators 

945 """ 

946 if not updates: 

947 return [] 

948 

949 self._check_connection() 

950 query, params = self.query_builder.build_batch_update_query(updates) 

951 

952 try: 

953 self.conn.begin() 

954 self.conn.execute(query, params) 

955 self.conn.commit() 

956 

957 # Check which records were actually updated 

958 update_ids = [record_id for record_id, _ in updates] 

959 placeholders = ", ".join(["?" for _ in update_ids]) 

960 check_query = f"SELECT id FROM {self.table_name} WHERE id IN ({placeholders})" 

961 

962 rows = self.conn.execute(check_query, update_ids).fetchall() 

963 existing_ids = {row[0] for row in rows} 

964 

965 results = [] 

966 for record_id, _ in updates: 

967 results.append(record_id in existing_ids) 

968 

969 return results 

970 except Exception: 

971 self.conn.rollback() 

972 raise 

973 

974 def delete_batch(self, ids: list[str]) -> list[bool]: 

975 """Delete multiple records efficiently. 

976 

977 Args: 

978 ids: List of record IDs to delete 

979 

980 Returns: 

981 List of success indicators 

982 """ 

983 if not ids: 

984 return [] 

985 

986 self._check_connection() 

987 

988 # Check which IDs exist before deletion 

989 placeholders = ", ".join(["?" for _ in ids]) 

990 check_query = f"SELECT id FROM {self.table_name} WHERE id IN ({placeholders})" 

991 

992 rows = self.conn.execute(check_query, ids).fetchall() 

993 existing_ids = {row[0] for row in rows} 

994 

995 query, params = self.query_builder.build_batch_delete_query(ids) 

996 

997 try: 

998 self.conn.begin() 

999 self.conn.execute(query, params) 

1000 self.conn.commit() 

1001 

1002 results = [] 

1003 for id in ids: 

1004 results.append(id in existing_ids) 

1005 

1006 return results 

1007 except Exception: 

1008 self.conn.rollback() 

1009 raise 

1010 

1011 def _initialize(self) -> None: 

1012 """Initialize method - connection setup handled in connect().""" 

1013 pass 

1014 

1015 def _count_all(self) -> int: 

1016 """Count all records in the database.""" 

1017 self._check_connection() 

1018 

1019 result = self.conn.execute(f"SELECT COUNT(*) FROM {self.table_name}").fetchone() 

1020 return result[0] if result else 0 

1021 

1022 def stream_read( 

1023 self, 

1024 query: Query | None = None, 

1025 config: StreamConfig | None = None 

1026 ) -> Iterator[Record]: 

1027 """Stream records from database. 

1028 

1029 Args: 

1030 query: Optional query specification 

1031 config: Stream configuration 

1032 

1033 Yields: 

1034 Records one at a time 

1035 """ 

1036 from ..streaming import StreamConfig 

1037 

1038 config = config or StreamConfig() 

1039 query = query or Query() 

1040 

1041 offset = 0 

1042 while True: 

1043 query_copy = query.copy() 

1044 query_copy.offset(offset).limit(config.batch_size) 

1045 batch = self.search(query_copy) 

1046 

1047 if not batch: 

1048 break 

1049 

1050 for record in batch: 

1051 yield record 

1052 

1053 offset += len(batch) 

1054 

1055 if len(batch) < config.batch_size: 

1056 break 

1057 

1058 def stream_write( 

1059 self, 

1060 records: Iterator[Record], 

1061 config: StreamConfig | None = None 

1062 ) -> StreamResult: 

1063 """Stream records into database. 

1064 

1065 Args: 

1066 records: Iterator of records 

1067 config: Stream configuration 

1068 

1069 Returns: 

1070 Stream result with statistics 

1071 """ 

1072 import time 

1073 

1074 from ..streaming import StreamConfig, StreamResult 

1075 

1076 config = config or StreamConfig() 

1077 batch = [] 

1078 total_written = 0 

1079 start_time = time.time() 

1080 

1081 for record in records: 

1082 batch.append(record) 

1083 

1084 if len(batch) >= config.batch_size: 

1085 self.create_batch(batch) 

1086 total_written += len(batch) 

1087 batch = [] 

1088 

1089 # Write any remaining records 

1090 if batch: 

1091 self.create_batch(batch) 

1092 total_written += len(batch) 

1093 

1094 elapsed = time.time() - start_time 

1095 

1096 return StreamResult( 

1097 total_processed=total_written, 

1098 successful=total_written, 

1099 failed=0, 

1100 duration=elapsed, 

1101 total_batches=(total_written + config.batch_size - 1) // config.batch_size 

1102 )