Coverage for src/dataknobs_data/database.py: 26%

279 statements  

« prev     ^ index     » next       coverage.py v7.11.3, created at 2025-11-13 11:36 -0700

1"""Database abstraction layer providing unified interfaces for data operations. 

2 

3This module defines abstract base classes for synchronous and asynchronous database 

4operations, supporting CRUD, querying, streaming, and schema management across 

5different backend database implementations. 

6""" 

7 

8from __future__ import annotations 

9 

10from abc import ABC, abstractmethod 

11from typing import Any, TYPE_CHECKING 

12 

13from .database_utils import ensure_record_id, process_search_results 

14from .query import Query 

15from .schema import DatabaseSchema, FieldSchema 

16 

17if TYPE_CHECKING: 

18 from collections.abc import AsyncIterator, Callable, Iterator 

19 from .query_logic import ComplexQuery 

20 from .records import Record 

21 from .streaming import StreamConfig, StreamResult 

22 

23 

24class AsyncDatabase(ABC): 

25 """Abstract base class for async database implementations. 

26 

27 Provides a unified async interface for CRUD operations, querying, and streaming 

28 across different backend databases. Supports schema validation, batch operations, 

29 and complex queries with boolean logic. 

30 

31 Example: 

32 ```python 

33 from dataknobs_data import async_database_factory, Record, Query, Filter, Operator 

34 

35 # Create async database 

36 db = async_database_factory("memory") 

37 

38 # Use as async context manager 

39 async with db: 

40 # Create records 

41 id1 = await db.create(Record({"name": "Alice", "age": 30})) 

42 id2 = await db.create(Record({"name": "Bob", "age": 25})) 

43 

44 # Query records 

45 query = Query(filters=[Filter("age", Operator.GT, 25)]) 

46 results = await db.search(query) 

47 print(results) # [Alice's record] 

48 

49 # Update record 

50 await db.update(id1, Record({"name": "Alice", "age": 31})) 

51 

52 # Stream large datasets 

53 async for record in db.stream_read(): 

54 process_record(record) 

55 ``` 

56 """ 

57 

58 def __init__(self, config: dict[str, Any] | None = None, schema: DatabaseSchema | None = None): 

59 """Initialize the database with optional configuration. 

60 

61 Args: 

62 config: Backend-specific configuration parameters (may include 'schema' key) 

63 schema: Optional database schema (overrides config schema) 

64 

65 Example: 

66 ```python 

67 from dataknobs_data import AsyncDatabase 

68 from dataknobs_data.schema import DatabaseSchema 

69 from dataknobs_data.fields import FieldType 

70 

71 # With schema 

72 schema = DatabaseSchema.create( 

73 name=FieldType.STRING, 

74 age=FieldType.INTEGER 

75 ) 

76 db = AsyncDatabase(config={"path": "data.db"}, schema=schema) 

77 ``` 

78 """ 

79 config = config or {} 

80 

81 # Extract schema from config if present and no explicit schema provided 

82 if schema is None and "schema" in config: 

83 schema = self._extract_schema_from_config(config["schema"]) 

84 # Remove schema from config so backends don't see it 

85 config = {k: v for k, v in config.items() if k != "schema"} 

86 

87 self.config = config 

88 self.schema = schema or DatabaseSchema() 

89 self._initialize() 

90 

91 @staticmethod 

92 def _extract_schema_from_config(schema_config: Any) -> DatabaseSchema | None: 

93 """Extract schema from configuration. 

94  

95 Args: 

96 schema_config: Can be a DatabaseSchema, dict, or None 

97  

98 Returns: 

99 DatabaseSchema instance or None 

100 """ 

101 if isinstance(schema_config, DatabaseSchema): 

102 return schema_config 

103 elif isinstance(schema_config, dict): 

104 return DatabaseSchema.from_dict(schema_config) 

105 return None 

106 

107 def _initialize(self) -> None: # noqa: B027 

108 """Initialize the database backend. Override in subclasses if needed.""" 

109 # Default implementation does nothing - backends can override if needed 

110 

111 def _ensure_record_id(self, record: Record, record_id: str) -> Record: 

112 """Ensure a record has its ID set (delegates to utility function).""" 

113 return ensure_record_id(record, record_id) 

114 

115 def _prepare_record_for_storage(self, record: Record) -> tuple[Record, str]: 

116 """Prepare a record for storage by ensuring it has a storage_id. 

117  

118 Args: 

119 record: The record to prepare 

120  

121 Returns: 

122 Tuple of (prepared_record_copy, storage_id) 

123 """ 

124 import uuid 

125 # Make a copy to avoid modifying the original 

126 record_copy = record.copy(deep=True) 

127 

128 # Generate storage ID if not present 

129 if not record_copy.has_storage_id(): 

130 storage_id = str(uuid.uuid4()) 

131 record_copy.storage_id = storage_id 

132 else: 

133 storage_id = record_copy.storage_id 

134 

135 return record_copy, storage_id 

136 

137 def _prepare_record_from_storage(self, record: Record | None, storage_id: str) -> Record | None: 

138 """Prepare a record retrieved from storage by ensuring storage_id is set. 

139  

140 Args: 

141 record: The record retrieved from storage (or None) 

142 storage_id: The storage ID used to retrieve the record 

143  

144 Returns: 

145 Record with storage_id set, or None if record was None 

146 """ 

147 if record: 

148 record_copy = record.copy(deep=True) 

149 # Ensure storage_id is set 

150 if not record_copy.has_storage_id(): 

151 record_copy.storage_id = storage_id 

152 return record_copy 

153 return None 

154 

155 def _process_search_results( 

156 self, 

157 results: list[tuple[str, Record]], 

158 query: Query, 

159 deep_copy: bool = True 

160 ) -> list[Record]: 

161 """Process search results (delegates to utility function).""" 

162 return process_search_results(results, query, deep_copy) 

163 

164 def set_schema(self, schema: DatabaseSchema) -> None: 

165 """Set the database schema. 

166  

167 Args: 

168 schema: The database schema to use 

169 """ 

170 self.schema = schema 

171 

172 def add_field_schema(self, field_schema: FieldSchema) -> None: 

173 """Add a field to the database schema. 

174  

175 Args: 

176 field_schema: The field schema to add 

177 """ 

178 self.schema.add_field(field_schema) 

179 

180 def with_schema(self, **field_definitions) -> AsyncDatabase: 

181 """Set schema using field definitions. 

182  

183 Returns self for chaining. 

184  

185 Examples: 

186 db = AsyncMemoryDatabase().with_schema( 

187 content=FieldType.TEXT, 

188 embedding=(FieldType.VECTOR, {"dimensions": 384, "source_field": "content"}) 

189 ) 

190 """ 

191 self.schema = DatabaseSchema.create(**field_definitions) 

192 return self 

193 

194 @abstractmethod 

195 async def create(self, record: Record) -> str: 

196 """Create a new record in the database. 

197 

198 Args: 

199 record: The record to create 

200 

201 Returns: 

202 The ID of the created record 

203 """ 

204 raise NotImplementedError 

205 

206 @abstractmethod 

207 async def read(self, id: str) -> Record | None: 

208 """Read a record by ID. 

209 

210 Args: 

211 id: The record ID 

212 

213 Returns: 

214 The record if found, None otherwise 

215 """ 

216 raise NotImplementedError 

217 

218 @abstractmethod 

219 async def update(self, id: str, record: Record) -> bool: 

220 """Update an existing record. 

221 

222 Args: 

223 id: The record ID 

224 record: The updated record 

225 

226 Returns: 

227 True if the record was updated, False if not found 

228 """ 

229 raise NotImplementedError 

230 

231 @abstractmethod 

232 async def delete(self, id: str) -> bool: 

233 """Delete a record by ID. 

234 

235 Args: 

236 id: The record ID 

237 

238 Returns: 

239 True if the record was deleted, False if not found 

240 """ 

241 raise NotImplementedError 

242 

243 @abstractmethod 

244 async def search(self, query: Query | ComplexQuery) -> list[Record]: 

245 """Search for records matching a query. 

246 

247 Args: 

248 query: The search query (simple or complex) 

249 

250 Returns: 

251 List of matching records 

252 """ 

253 raise NotImplementedError 

254 

255 async def all(self) -> list[Record]: 

256 """Get all records from the database. 

257  

258 Returns: 

259 List of all records 

260 """ 

261 # Default implementation using search with empty query 

262 from .query import Query 

263 return await self.search(Query()) 

264 

265 async def _search_with_complex_query(self, query: ComplexQuery) -> list[Record]: 

266 """Default implementation for ComplexQuery using in-memory filtering. 

267  

268 Backends can override this for native boolean logic support. 

269  

270 Args: 

271 query: Complex query with boolean logic 

272  

273 Returns: 

274 List of matching records 

275 """ 

276 # Try to convert to simple query if possible 

277 try: 

278 simple_query = query.to_simple_query() 

279 return await self.search(simple_query) 

280 except ValueError: 

281 # Can't convert - need to do in-memory filtering 

282 # Get all records (or use a base filter if possible) 

283 all_records = await self.search(Query()) 

284 

285 # Apply complex condition filtering 

286 results = [] 

287 for record in all_records: 

288 if query.matches(record): 

289 results.append(record) 

290 

291 # Apply sorting 

292 if query.sort_specs: 

293 for sort_spec in reversed(query.sort_specs): 

294 reverse = sort_spec.order.value == "desc" 

295 results.sort( 

296 key=lambda r: r.get_value(sort_spec.field, ""), 

297 reverse=reverse 

298 ) 

299 

300 # Apply offset and limit 

301 if query.offset_value: 

302 results = results[query.offset_value:] 

303 if query.limit_value: 

304 results = results[:query.limit_value] 

305 

306 # Apply field projection 

307 if query.fields: 

308 results = [r.project(query.fields) for r in results] 

309 

310 return results 

311 

312 @abstractmethod 

313 async def exists(self, id: str) -> bool: 

314 """Check if a record exists. 

315 

316 Args: 

317 id: The record ID 

318 

319 Returns: 

320 True if the record exists, False otherwise 

321 """ 

322 raise NotImplementedError 

323 

324 async def upsert(self, id_or_record: str | Record, record: Record | None = None) -> str: 

325 """Update or insert a record. 

326  

327 Can be called as: 

328 - upsert(id, record) - explicit ID and record 

329 - upsert(record) - extract ID from record using Record's built-in logic 

330  

331 Args: 

332 id_or_record: Either an ID string or a Record 

333 record: The record to upsert (if first arg is ID) 

334  

335 Returns: 

336 The record ID 

337 """ 

338 import uuid 

339 

340 # Determine ID and record based on arguments 

341 if isinstance(id_or_record, str): 

342 # Called with explicit ID: upsert(id, record) 

343 id = id_or_record 

344 if record is None: 

345 raise ValueError("Record required when ID is provided") 

346 else: 

347 # Called with just record: upsert(record) 

348 record = id_or_record 

349 # Use Record's built-in ID property which handles all the logic 

350 id = record.id 

351 

352 if id is None: 

353 # Generate a new ID if none found 

354 id = str(uuid.uuid4()) # type: ignore[unreachable] 

355 # Set it on the record for future reference 

356 record.storage_id = id 

357 

358 # Now perform the upsert 

359 if await self.exists(id): 

360 await self.update(id, record) 

361 else: 

362 # Ensure the record has the storage_id set for create 

363 if not record.storage_id: 

364 record.storage_id = id 

365 created_id = await self.create(record) 

366 # Return the created ID (might be different from what we provided) 

367 return created_id or id 

368 return id 

369 

370 async def create_batch(self, records: list[Record]) -> list[str]: 

371 """Create multiple records in batch. 

372 

373 Args: 

374 records: List of records to create 

375 

376 Returns: 

377 List of created record IDs 

378 """ 

379 ids = [] 

380 for record in records: 

381 id = await self.create(record) 

382 ids.append(id) 

383 return ids 

384 

385 async def read_batch(self, ids: list[str]) -> list[Record | None]: 

386 """Read multiple records by ID. 

387 

388 Args: 

389 ids: List of record IDs 

390 

391 Returns: 

392 List of records (None for not found) 

393 """ 

394 records = [] 

395 for id in ids: 

396 record = await self.read(id) 

397 records.append(record) 

398 return records 

399 

400 async def delete_batch(self, ids: list[str]) -> list[bool]: 

401 """Delete multiple records by ID. 

402 

403 Args: 

404 ids: List of record IDs 

405 

406 Returns: 

407 List of deletion results 

408 """ 

409 results = [] 

410 for id in ids: 

411 result = await self.delete(id) 

412 results.append(result) 

413 return results 

414 

415 async def update_batch(self, updates: list[tuple[str, Record]]) -> list[bool]: 

416 """Update multiple records. 

417 

418 Default implementation calls update() for each ID/record pair. 

419 Override for better performance. 

420 

421 Args: 

422 updates: List of (id, record) tuples to update 

423 

424 Returns: 

425 List of success flags for each update 

426 """ 

427 results = [] 

428 for id, record in updates: 

429 result = await self.update(id, record) 

430 results.append(result) 

431 return results 

432 

433 async def count(self, query: Query | None = None) -> int: 

434 """Count records matching a query. 

435 

436 Args: 

437 query: Optional search query (counts all if None) 

438 

439 Returns: 

440 Number of matching records 

441 """ 

442 if query: 

443 results = await self.search(query) 

444 return len(results) 

445 else: 

446 return await self._count_all() 

447 

448 @abstractmethod 

449 async def _count_all(self) -> int: 

450 """Count all records in the database.""" 

451 raise NotImplementedError 

452 

453 async def clear(self) -> int: 

454 """Clear all records from the database. 

455 

456 Returns: 

457 Number of records deleted 

458 """ 

459 raise NotImplementedError 

460 

461 async def connect(self) -> None: # noqa: B027 

462 """Connect to the database. Override in subclasses if needed.""" 

463 # Default implementation does nothing - many backends don't need explicit connection 

464 

465 async def close(self) -> None: # noqa: B027 

466 """Close the database connection. Override in subclasses if needed.""" 

467 # Default implementation does nothing - many backends don't need explicit closing 

468 

469 async def disconnect(self) -> None: 

470 """Disconnect from the database (alias for close).""" 

471 await self.close() 

472 

473 async def __aenter__(self): 

474 """Async context manager entry.""" 

475 await self.connect() 

476 return self 

477 

478 async def __aexit__(self, exc_type, exc_val, exc_tb): 

479 """Async context manager exit.""" 

480 await self.close() 

481 

482 @abstractmethod 

483 async def stream_read( 

484 self, 

485 query: Query | None = None, 

486 config: StreamConfig | None = None 

487 ) -> AsyncIterator[Record]: 

488 """Stream records from database. 

489  

490 Yields records one at a time, fetching in batches internally. 

491  

492 Args: 

493 query: Optional query to filter records 

494 config: Streaming configuration 

495  

496 Yields: 

497 Records matching the query 

498 """ 

499 raise NotImplementedError 

500 

501 @abstractmethod 

502 async def stream_write( 

503 self, 

504 records: AsyncIterator[Record], 

505 config: StreamConfig | None = None 

506 ) -> StreamResult: 

507 """Stream records into database. 

508  

509 Accepts an iterator and writes in batches. 

510  

511 Args: 

512 records: Iterator of records to write 

513 config: Streaming configuration 

514  

515 Returns: 

516 Result of the streaming operation 

517 """ 

518 raise NotImplementedError 

519 

520 async def stream_transform( 

521 self, 

522 query: Query | None = None, 

523 transform: Callable[[Record], Record | None] | None = None, 

524 config: StreamConfig | None = None 

525 ) -> AsyncIterator[Record]: 

526 """Stream records through a transformation. 

527  

528 Default implementation, can be overridden for efficiency. 

529  

530 Args: 

531 query: Optional query to filter records 

532 transform: Optional transformation function 

533 config: Streaming configuration 

534  

535 Yields: 

536 Transformed records 

537 """ 

538 async for record in self.stream_read(query, config): 

539 if transform: 

540 transformed = transform(record) 

541 if transformed: # None means filter out 

542 yield transformed 

543 else: 

544 yield record 

545 

546 @classmethod 

547 async def from_backend(cls, backend: str, config: dict[str, Any] | None = None) -> AsyncDatabase: 

548 """Factory method to create and connect a database instance. 

549 

550 Args: 

551 backend: The backend type ("memory", "file", "s3", "postgres", "elasticsearch") 

552 config: Backend-specific configuration 

553 

554 Returns: 

555 Connected AsyncDatabase instance 

556 """ 

557 from .backends import BACKEND_REGISTRY 

558 

559 backend_class = BACKEND_REGISTRY.get(backend) 

560 if not backend_class: 

561 raise ValueError( 

562 f"Unknown backend: {backend}. Available: {list(BACKEND_REGISTRY.keys())}" 

563 ) 

564 

565 instance = backend_class(config) 

566 await instance.connect() 

567 return instance 

568 

569 

570class SyncDatabase(ABC): 

571 """Synchronous variant of the Database abstract base class. 

572 

573 Provides a unified synchronous interface for CRUD operations, querying, and streaming 

574 across different backend databases. Supports schema validation, batch operations, 

575 and complex queries with boolean logic. 

576 

577 Example: 

578 ```python 

579 from dataknobs_data import database_factory, Record, Query, Filter, Operator 

580 

581 # Create database 

582 db = database_factory("memory") 

583 

584 # Use as context manager 

585 with db: 

586 # Create records 

587 id1 = db.create(Record({"name": "Alice", "age": 30})) 

588 id2 = db.create(Record({"name": "Bob", "age": 25})) 

589 

590 # Query records 

591 query = Query(filters=[Filter("age", Operator.GT, 25)]) 

592 results = db.search(query) 

593 print(results) # [Alice's record] 

594 

595 # Update record 

596 db.update(id1, Record({"name": "Alice", "age": 31})) 

597 

598 # Stream large datasets 

599 for record in db.stream_read(): 

600 process_record(record) 

601 ``` 

602 """ 

603 

604 def __init__(self, config: dict[str, Any] | None = None, schema: DatabaseSchema | None = None): 

605 """Initialize the database with optional configuration. 

606 

607 Args: 

608 config: Backend-specific configuration parameters (may include 'schema' key) 

609 schema: Optional database schema (overrides config schema) 

610 

611 Example: 

612 ```python 

613 from dataknobs_data import SyncDatabase 

614 from dataknobs_data.schema import DatabaseSchema 

615 from dataknobs_data.fields import FieldType 

616 

617 # With schema 

618 schema = DatabaseSchema.create( 

619 name=FieldType.STRING, 

620 age=FieldType.INTEGER 

621 ) 

622 db = SyncDatabase(config={"path": "data.db"}, schema=schema) 

623 ``` 

624 """ 

625 config = config or {} 

626 

627 # Extract schema from config if present and no explicit schema provided 

628 if schema is None and "schema" in config: 

629 schema = AsyncDatabase._extract_schema_from_config(config["schema"]) 

630 # Remove schema from config so backends don't see it 

631 config = {k: v for k, v in config.items() if k != "schema"} 

632 

633 self.config = config 

634 self.schema = schema or DatabaseSchema() 

635 self._initialize() 

636 

637 def _initialize(self) -> None: # noqa: B027 

638 """Initialize the database backend. Override in subclasses if needed.""" 

639 # Default implementation does nothing - backends can override if needed 

640 

641 def _ensure_record_id(self, record: Record, record_id: str) -> Record: 

642 """Ensure a record has its ID set (delegates to utility function).""" 

643 return ensure_record_id(record, record_id) 

644 

645 def _prepare_record_for_storage(self, record: Record) -> tuple[Record, str]: 

646 """Prepare a record for storage by ensuring it has a storage_id. 

647  

648 Args: 

649 record: The record to prepare 

650  

651 Returns: 

652 Tuple of (prepared_record_copy, storage_id) 

653 """ 

654 import uuid 

655 # Make a copy to avoid modifying the original 

656 record_copy = record.copy(deep=True) 

657 

658 # Generate storage ID if not present 

659 if not record_copy.has_storage_id(): 

660 storage_id = str(uuid.uuid4()) 

661 record_copy.storage_id = storage_id 

662 else: 

663 storage_id = record_copy.storage_id 

664 

665 return record_copy, storage_id 

666 

667 def _prepare_record_from_storage(self, record: Record | None, storage_id: str) -> Record | None: 

668 """Prepare a record retrieved from storage by ensuring storage_id is set. 

669  

670 Args: 

671 record: The record retrieved from storage (or None) 

672 storage_id: The storage ID used to retrieve the record 

673  

674 Returns: 

675 Record with storage_id set, or None if record was None 

676 """ 

677 if record: 

678 record_copy = record.copy(deep=True) 

679 # Ensure storage_id is set 

680 if not record_copy.has_storage_id(): 

681 record_copy.storage_id = storage_id 

682 return record_copy 

683 return None 

684 

685 def _process_search_results( 

686 self, 

687 results: list[tuple[str, Record]], 

688 query: Query, 

689 deep_copy: bool = True 

690 ) -> list[Record]: 

691 """Process search results (delegates to utility function).""" 

692 return process_search_results(results, query, deep_copy) 

693 

694 def set_schema(self, schema: DatabaseSchema) -> None: 

695 """Set the database schema. 

696  

697 Args: 

698 schema: The database schema to use 

699 """ 

700 self.schema = schema 

701 

702 def add_field_schema(self, field_schema: FieldSchema) -> None: 

703 """Add a field to the database schema. 

704  

705 Args: 

706 field_schema: The field schema to add 

707 """ 

708 self.schema.add_field(field_schema) 

709 

710 def with_schema(self, **field_definitions) -> SyncDatabase: 

711 """Set schema using field definitions. 

712  

713 Returns self for chaining. 

714  

715 Examples: 

716 db = SyncMemoryDatabase().with_schema( 

717 content=FieldType.TEXT, 

718 embedding=(FieldType.VECTOR, {"dimensions": 384, "source_field": "content"}) 

719 ) 

720 """ 

721 self.schema = DatabaseSchema.create(**field_definitions) 

722 return self 

723 

724 @abstractmethod 

725 def create(self, record: Record) -> str: 

726 """Create a new record in the database. 

727 

728 Args: 

729 record: The record to create 

730 

731 Returns: 

732 The ID of the created record 

733 

734 Example: 

735 ```python 

736 record = Record({"name": "Alice", "age": 30}) 

737 record_id = db.create(record) 

738 print(record_id) # "550e8400-e29b-41d4-a716-446655440000" 

739 ``` 

740 """ 

741 raise NotImplementedError 

742 

743 @abstractmethod 

744 def read(self, id: str) -> Record | None: 

745 """Read a record by ID. 

746 

747 Args: 

748 id: The record ID 

749 

750 Returns: 

751 The record if found, None otherwise 

752 

753 Example: 

754 ```python 

755 record = db.read("550e8400-e29b-41d4-a716-446655440000") 

756 if record: 

757 print(record.get_value("name")) # "Alice" 

758 ``` 

759 """ 

760 raise NotImplementedError 

761 

762 @abstractmethod 

763 def update(self, id: str, record: Record) -> bool: 

764 """Update an existing record. 

765 

766 Args: 

767 id: The record ID 

768 record: The updated record 

769 

770 Returns: 

771 True if the record was updated, False if not found 

772 

773 Example: 

774 ```python 

775 updated_record = Record({"name": "Alice", "age": 31}) 

776 success = db.update(record_id, updated_record) 

777 print(success) # True 

778 ``` 

779 """ 

780 raise NotImplementedError 

781 

782 @abstractmethod 

783 def delete(self, id: str) -> bool: 

784 """Delete a record by ID. 

785 

786 Args: 

787 id: The record ID 

788 

789 Returns: 

790 True if the record was deleted, False if not found 

791 

792 Example: 

793 ```python 

794 success = db.delete(record_id) 

795 print(success) # True 

796 ``` 

797 """ 

798 raise NotImplementedError 

799 

800 @abstractmethod 

801 def search(self, query: Query | ComplexQuery) -> list[Record]: 

802 """Search for records matching a query (simple or complex). 

803 

804 Args: 

805 query: The search query 

806 

807 Returns: 

808 List of matching records 

809 

810 Example: 

811 ```python 

812 # Simple query 

813 query = Query(filters=[Filter("age", Operator.GT, 25)]) 

814 results = db.search(query) 

815 

816 # Complex query with boolean logic 

817 from dataknobs_data.query_logic import QueryBuilder, LogicOperator 

818 

819 complex_query = ( 

820 QueryBuilder() 

821 .where("age", Operator.GT, 25) 

822 .and_where("name", Operator.LIKE, "A%") 

823 .build() 

824 ) 

825 results = db.search(complex_query) 

826 ``` 

827 """ 

828 raise NotImplementedError 

829 

830 def all(self) -> list[Record]: 

831 """Get all records from the database. 

832  

833 Returns: 

834 List of all records 

835 """ 

836 # Default implementation using search with empty query 

837 from .query import Query 

838 return self.search(Query()) 

839 

840 def _search_with_complex_query(self, query: ComplexQuery) -> list[Record]: 

841 """Default implementation for ComplexQuery using in-memory filtering. 

842  

843 Backends can override this for native boolean logic support. 

844  

845 Args: 

846 query: Complex query with boolean logic 

847  

848 Returns: 

849 List of matching records 

850 """ 

851 # Try to convert to simple query if possible 

852 try: 

853 simple_query = query.to_simple_query() 

854 return self.search(simple_query) 

855 except ValueError: 

856 # Can't convert - need to do in-memory filtering 

857 # Get all records (or use a base filter if possible) 

858 all_records = self.search(Query()) 

859 

860 # Apply complex condition filtering 

861 results = [] 

862 for record in all_records: 

863 if query.matches(record): 

864 results.append(record) 

865 

866 # Apply sorting 

867 if query.sort_specs: 

868 for sort_spec in reversed(query.sort_specs): 

869 reverse = sort_spec.order.value == "desc" 

870 results.sort( 

871 key=lambda r: r.get_value(sort_spec.field, ""), 

872 reverse=reverse 

873 ) 

874 

875 # Apply offset and limit 

876 if query.offset_value: 

877 results = results[query.offset_value:] 

878 if query.limit_value: 

879 results = results[:query.limit_value] 

880 

881 # Apply field projection 

882 if query.fields: 

883 results = [r.project(query.fields) for r in results] 

884 

885 return results 

886 

887 @abstractmethod 

888 def exists(self, id: str) -> bool: 

889 """Check if a record exists.""" 

890 raise NotImplementedError 

891 

892 def upsert(self, id_or_record: str | Record, record: Record | None = None) -> str: 

893 """Update or insert a record. 

894  

895 Can be called as: 

896 - upsert(id, record) - explicit ID and record 

897 - upsert(record) - extract ID from record using Record's built-in logic 

898  

899 Args: 

900 id_or_record: Either an ID string or a Record 

901 record: The record to upsert (if first arg is ID) 

902  

903 Returns: 

904 The record ID 

905 """ 

906 import uuid 

907 

908 # Determine ID and record based on arguments 

909 if isinstance(id_or_record, str): 

910 # Called with explicit ID: upsert(id, record) 

911 id = id_or_record 

912 if record is None: 

913 raise ValueError("Record required when ID is provided") 

914 else: 

915 # Called with just record: upsert(record) 

916 record = id_or_record 

917 # Use Record's built-in ID property which handles all the logic 

918 id = record.id 

919 

920 if id is None: 

921 # Generate a new ID if none found 

922 id = str(uuid.uuid4()) # type: ignore[unreachable] 

923 # Set it on the record for future reference 

924 record.storage_id = id 

925 

926 # Now perform the upsert 

927 if self.exists(id): 

928 self.update(id, record) 

929 else: 

930 # Ensure the record has the storage_id set for create 

931 if not record.storage_id: 

932 record.storage_id = id 

933 created_id = self.create(record) 

934 # Return the created ID (might be different from what we provided) 

935 return created_id or id 

936 return id 

937 

938 def create_batch(self, records: list[Record]) -> list[str]: 

939 """Create multiple records in batch.""" 

940 ids = [] 

941 for record in records: 

942 id = self.create(record) 

943 ids.append(id) 

944 return ids 

945 

946 def read_batch(self, ids: list[str]) -> list[Record | None]: 

947 """Read multiple records by ID.""" 

948 records = [] 

949 for id in ids: 

950 record = self.read(id) 

951 records.append(record) 

952 return records 

953 

954 def delete_batch(self, ids: list[str]) -> list[bool]: 

955 """Delete multiple records by ID.""" 

956 results = [] 

957 for id in ids: 

958 result = self.delete(id) 

959 results.append(result) 

960 return results 

961 

962 def update_batch(self, updates: list[tuple[str, Record]]) -> list[bool]: 

963 """Update multiple records. 

964 

965 Default implementation calls update() for each ID/record pair. 

966 Override for better performance. 

967 

968 Args: 

969 updates: List of (id, record) tuples to update 

970 

971 Returns: 

972 List of success flags for each update 

973 """ 

974 results = [] 

975 for id, record in updates: 

976 result = self.update(id, record) 

977 results.append(result) 

978 return results 

979 

980 def count(self, query: Query | None = None) -> int: 

981 """Count records matching a query.""" 

982 if query: 

983 results = self.search(query) 

984 return len(results) 

985 else: 

986 return self._count_all() 

987 

988 @abstractmethod 

989 def _count_all(self) -> int: 

990 """Count all records in the database.""" 

991 raise NotImplementedError 

992 

993 def clear(self) -> int: 

994 """Clear all records from the database.""" 

995 raise NotImplementedError 

996 

997 def connect(self) -> None: # noqa: B027 

998 """Connect to the database. Override in subclasses if needed.""" 

999 # Default implementation does nothing - many backends don't need explicit connection 

1000 

1001 def close(self) -> None: # noqa: B027 

1002 """Close the database connection. Override in subclasses if needed.""" 

1003 # Default implementation does nothing - many backends don't need explicit closing 

1004 

1005 def disconnect(self) -> None: 

1006 """Disconnect from the database (alias for close).""" 

1007 self.close() 

1008 

1009 def __enter__(self): 

1010 """Context manager entry.""" 

1011 self.connect() 

1012 return self 

1013 

1014 def __exit__(self, exc_type, exc_val, exc_tb): 

1015 """Context manager exit.""" 

1016 self.close() 

1017 

1018 @abstractmethod 

1019 def stream_read( 

1020 self, 

1021 query: Query | None = None, 

1022 config: StreamConfig | None = None 

1023 ) -> Iterator[Record]: 

1024 """Stream records from database. 

1025  

1026 Yields records one at a time, fetching in batches internally. 

1027  

1028 Args: 

1029 query: Optional query to filter records 

1030 config: Streaming configuration 

1031  

1032 Yields: 

1033 Records matching the query 

1034 """ 

1035 raise NotImplementedError 

1036 

1037 @abstractmethod 

1038 def stream_write( 

1039 self, 

1040 records: Iterator[Record], 

1041 config: StreamConfig | None = None 

1042 ) -> StreamResult: 

1043 """Stream records into database. 

1044  

1045 Accepts an iterator and writes in batches. 

1046  

1047 Args: 

1048 records: Iterator of records to write 

1049 config: Streaming configuration 

1050  

1051 Returns: 

1052 Result of the streaming operation 

1053 """ 

1054 raise NotImplementedError 

1055 

1056 def stream_transform( 

1057 self, 

1058 query: Query | None = None, 

1059 transform: Callable[[Record], Record | None] | None = None, 

1060 config: StreamConfig | None = None 

1061 ) -> Iterator[Record]: 

1062 """Stream records through a transformation. 

1063  

1064 Default implementation, can be overridden for efficiency. 

1065  

1066 Args: 

1067 query: Optional query to filter records 

1068 transform: Optional transformation function 

1069 config: Streaming configuration 

1070  

1071 Yields: 

1072 Transformed records 

1073 """ 

1074 for record in self.stream_read(query, config): 

1075 if transform: 

1076 transformed = transform(record) 

1077 if transformed: # None means filter out 

1078 yield transformed 

1079 else: 

1080 yield record 

1081 

1082 @classmethod 

1083 def from_backend(cls, backend: str, config: dict[str, Any] | None = None) -> SyncDatabase: 

1084 """Factory method to create and connect a synchronous database instance. 

1085 

1086 Args: 

1087 backend: The backend type ("memory", "file", "s3", "postgres", "elasticsearch") 

1088 config: Backend-specific configuration 

1089 

1090 Returns: 

1091 Connected SyncDatabase instance 

1092 """ 

1093 from .backends import SYNC_BACKEND_REGISTRY 

1094 

1095 backend_class = SYNC_BACKEND_REGISTRY.get(backend) 

1096 if not backend_class: 

1097 raise ValueError( 

1098 f"Unknown backend: {backend}. Available: {list(SYNC_BACKEND_REGISTRY.keys())}" 

1099 ) 

1100 

1101 instance = backend_class(config) 

1102 instance.connect() 

1103 return instance