Coverage for src/dataknobs_data/database.py: 26%
279 statements
« prev ^ index » next coverage.py v7.11.3, created at 2025-11-13 11:36 -0700
« prev ^ index » next coverage.py v7.11.3, created at 2025-11-13 11:36 -0700
1"""Database abstraction layer providing unified interfaces for data operations.
3This module defines abstract base classes for synchronous and asynchronous database
4operations, supporting CRUD, querying, streaming, and schema management across
5different backend database implementations.
6"""
8from __future__ import annotations
10from abc import ABC, abstractmethod
11from typing import Any, TYPE_CHECKING
13from .database_utils import ensure_record_id, process_search_results
14from .query import Query
15from .schema import DatabaseSchema, FieldSchema
17if TYPE_CHECKING:
18 from collections.abc import AsyncIterator, Callable, Iterator
19 from .query_logic import ComplexQuery
20 from .records import Record
21 from .streaming import StreamConfig, StreamResult
24class AsyncDatabase(ABC):
25 """Abstract base class for async database implementations.
27 Provides a unified async interface for CRUD operations, querying, and streaming
28 across different backend databases. Supports schema validation, batch operations,
29 and complex queries with boolean logic.
31 Example:
32 ```python
33 from dataknobs_data import async_database_factory, Record, Query, Filter, Operator
35 # Create async database
36 db = async_database_factory("memory")
38 # Use as async context manager
39 async with db:
40 # Create records
41 id1 = await db.create(Record({"name": "Alice", "age": 30}))
42 id2 = await db.create(Record({"name": "Bob", "age": 25}))
44 # Query records
45 query = Query(filters=[Filter("age", Operator.GT, 25)])
46 results = await db.search(query)
47 print(results) # [Alice's record]
49 # Update record
50 await db.update(id1, Record({"name": "Alice", "age": 31}))
52 # Stream large datasets
53 async for record in db.stream_read():
54 process_record(record)
55 ```
56 """
58 def __init__(self, config: dict[str, Any] | None = None, schema: DatabaseSchema | None = None):
59 """Initialize the database with optional configuration.
61 Args:
62 config: Backend-specific configuration parameters (may include 'schema' key)
63 schema: Optional database schema (overrides config schema)
65 Example:
66 ```python
67 from dataknobs_data import AsyncDatabase
68 from dataknobs_data.schema import DatabaseSchema
69 from dataknobs_data.fields import FieldType
71 # With schema
72 schema = DatabaseSchema.create(
73 name=FieldType.STRING,
74 age=FieldType.INTEGER
75 )
76 db = AsyncDatabase(config={"path": "data.db"}, schema=schema)
77 ```
78 """
79 config = config or {}
81 # Extract schema from config if present and no explicit schema provided
82 if schema is None and "schema" in config:
83 schema = self._extract_schema_from_config(config["schema"])
84 # Remove schema from config so backends don't see it
85 config = {k: v for k, v in config.items() if k != "schema"}
87 self.config = config
88 self.schema = schema or DatabaseSchema()
89 self._initialize()
91 @staticmethod
92 def _extract_schema_from_config(schema_config: Any) -> DatabaseSchema | None:
93 """Extract schema from configuration.
95 Args:
96 schema_config: Can be a DatabaseSchema, dict, or None
98 Returns:
99 DatabaseSchema instance or None
100 """
101 if isinstance(schema_config, DatabaseSchema):
102 return schema_config
103 elif isinstance(schema_config, dict):
104 return DatabaseSchema.from_dict(schema_config)
105 return None
107 def _initialize(self) -> None: # noqa: B027
108 """Initialize the database backend. Override in subclasses if needed."""
109 # Default implementation does nothing - backends can override if needed
111 def _ensure_record_id(self, record: Record, record_id: str) -> Record:
112 """Ensure a record has its ID set (delegates to utility function)."""
113 return ensure_record_id(record, record_id)
115 def _prepare_record_for_storage(self, record: Record) -> tuple[Record, str]:
116 """Prepare a record for storage by ensuring it has a storage_id.
118 Args:
119 record: The record to prepare
121 Returns:
122 Tuple of (prepared_record_copy, storage_id)
123 """
124 import uuid
125 # Make a copy to avoid modifying the original
126 record_copy = record.copy(deep=True)
128 # Generate storage ID if not present
129 if not record_copy.has_storage_id():
130 storage_id = str(uuid.uuid4())
131 record_copy.storage_id = storage_id
132 else:
133 storage_id = record_copy.storage_id
135 return record_copy, storage_id
137 def _prepare_record_from_storage(self, record: Record | None, storage_id: str) -> Record | None:
138 """Prepare a record retrieved from storage by ensuring storage_id is set.
140 Args:
141 record: The record retrieved from storage (or None)
142 storage_id: The storage ID used to retrieve the record
144 Returns:
145 Record with storage_id set, or None if record was None
146 """
147 if record:
148 record_copy = record.copy(deep=True)
149 # Ensure storage_id is set
150 if not record_copy.has_storage_id():
151 record_copy.storage_id = storage_id
152 return record_copy
153 return None
155 def _process_search_results(
156 self,
157 results: list[tuple[str, Record]],
158 query: Query,
159 deep_copy: bool = True
160 ) -> list[Record]:
161 """Process search results (delegates to utility function)."""
162 return process_search_results(results, query, deep_copy)
164 def set_schema(self, schema: DatabaseSchema) -> None:
165 """Set the database schema.
167 Args:
168 schema: The database schema to use
169 """
170 self.schema = schema
172 def add_field_schema(self, field_schema: FieldSchema) -> None:
173 """Add a field to the database schema.
175 Args:
176 field_schema: The field schema to add
177 """
178 self.schema.add_field(field_schema)
180 def with_schema(self, **field_definitions) -> AsyncDatabase:
181 """Set schema using field definitions.
183 Returns self for chaining.
185 Examples:
186 db = AsyncMemoryDatabase().with_schema(
187 content=FieldType.TEXT,
188 embedding=(FieldType.VECTOR, {"dimensions": 384, "source_field": "content"})
189 )
190 """
191 self.schema = DatabaseSchema.create(**field_definitions)
192 return self
194 @abstractmethod
195 async def create(self, record: Record) -> str:
196 """Create a new record in the database.
198 Args:
199 record: The record to create
201 Returns:
202 The ID of the created record
203 """
204 raise NotImplementedError
206 @abstractmethod
207 async def read(self, id: str) -> Record | None:
208 """Read a record by ID.
210 Args:
211 id: The record ID
213 Returns:
214 The record if found, None otherwise
215 """
216 raise NotImplementedError
218 @abstractmethod
219 async def update(self, id: str, record: Record) -> bool:
220 """Update an existing record.
222 Args:
223 id: The record ID
224 record: The updated record
226 Returns:
227 True if the record was updated, False if not found
228 """
229 raise NotImplementedError
231 @abstractmethod
232 async def delete(self, id: str) -> bool:
233 """Delete a record by ID.
235 Args:
236 id: The record ID
238 Returns:
239 True if the record was deleted, False if not found
240 """
241 raise NotImplementedError
243 @abstractmethod
244 async def search(self, query: Query | ComplexQuery) -> list[Record]:
245 """Search for records matching a query.
247 Args:
248 query: The search query (simple or complex)
250 Returns:
251 List of matching records
252 """
253 raise NotImplementedError
255 async def all(self) -> list[Record]:
256 """Get all records from the database.
258 Returns:
259 List of all records
260 """
261 # Default implementation using search with empty query
262 from .query import Query
263 return await self.search(Query())
265 async def _search_with_complex_query(self, query: ComplexQuery) -> list[Record]:
266 """Default implementation for ComplexQuery using in-memory filtering.
268 Backends can override this for native boolean logic support.
270 Args:
271 query: Complex query with boolean logic
273 Returns:
274 List of matching records
275 """
276 # Try to convert to simple query if possible
277 try:
278 simple_query = query.to_simple_query()
279 return await self.search(simple_query)
280 except ValueError:
281 # Can't convert - need to do in-memory filtering
282 # Get all records (or use a base filter if possible)
283 all_records = await self.search(Query())
285 # Apply complex condition filtering
286 results = []
287 for record in all_records:
288 if query.matches(record):
289 results.append(record)
291 # Apply sorting
292 if query.sort_specs:
293 for sort_spec in reversed(query.sort_specs):
294 reverse = sort_spec.order.value == "desc"
295 results.sort(
296 key=lambda r: r.get_value(sort_spec.field, ""),
297 reverse=reverse
298 )
300 # Apply offset and limit
301 if query.offset_value:
302 results = results[query.offset_value:]
303 if query.limit_value:
304 results = results[:query.limit_value]
306 # Apply field projection
307 if query.fields:
308 results = [r.project(query.fields) for r in results]
310 return results
312 @abstractmethod
313 async def exists(self, id: str) -> bool:
314 """Check if a record exists.
316 Args:
317 id: The record ID
319 Returns:
320 True if the record exists, False otherwise
321 """
322 raise NotImplementedError
324 async def upsert(self, id_or_record: str | Record, record: Record | None = None) -> str:
325 """Update or insert a record.
327 Can be called as:
328 - upsert(id, record) - explicit ID and record
329 - upsert(record) - extract ID from record using Record's built-in logic
331 Args:
332 id_or_record: Either an ID string or a Record
333 record: The record to upsert (if first arg is ID)
335 Returns:
336 The record ID
337 """
338 import uuid
340 # Determine ID and record based on arguments
341 if isinstance(id_or_record, str):
342 # Called with explicit ID: upsert(id, record)
343 id = id_or_record
344 if record is None:
345 raise ValueError("Record required when ID is provided")
346 else:
347 # Called with just record: upsert(record)
348 record = id_or_record
349 # Use Record's built-in ID property which handles all the logic
350 id = record.id
352 if id is None:
353 # Generate a new ID if none found
354 id = str(uuid.uuid4()) # type: ignore[unreachable]
355 # Set it on the record for future reference
356 record.storage_id = id
358 # Now perform the upsert
359 if await self.exists(id):
360 await self.update(id, record)
361 else:
362 # Ensure the record has the storage_id set for create
363 if not record.storage_id:
364 record.storage_id = id
365 created_id = await self.create(record)
366 # Return the created ID (might be different from what we provided)
367 return created_id or id
368 return id
370 async def create_batch(self, records: list[Record]) -> list[str]:
371 """Create multiple records in batch.
373 Args:
374 records: List of records to create
376 Returns:
377 List of created record IDs
378 """
379 ids = []
380 for record in records:
381 id = await self.create(record)
382 ids.append(id)
383 return ids
385 async def read_batch(self, ids: list[str]) -> list[Record | None]:
386 """Read multiple records by ID.
388 Args:
389 ids: List of record IDs
391 Returns:
392 List of records (None for not found)
393 """
394 records = []
395 for id in ids:
396 record = await self.read(id)
397 records.append(record)
398 return records
400 async def delete_batch(self, ids: list[str]) -> list[bool]:
401 """Delete multiple records by ID.
403 Args:
404 ids: List of record IDs
406 Returns:
407 List of deletion results
408 """
409 results = []
410 for id in ids:
411 result = await self.delete(id)
412 results.append(result)
413 return results
415 async def update_batch(self, updates: list[tuple[str, Record]]) -> list[bool]:
416 """Update multiple records.
418 Default implementation calls update() for each ID/record pair.
419 Override for better performance.
421 Args:
422 updates: List of (id, record) tuples to update
424 Returns:
425 List of success flags for each update
426 """
427 results = []
428 for id, record in updates:
429 result = await self.update(id, record)
430 results.append(result)
431 return results
433 async def count(self, query: Query | None = None) -> int:
434 """Count records matching a query.
436 Args:
437 query: Optional search query (counts all if None)
439 Returns:
440 Number of matching records
441 """
442 if query:
443 results = await self.search(query)
444 return len(results)
445 else:
446 return await self._count_all()
448 @abstractmethod
449 async def _count_all(self) -> int:
450 """Count all records in the database."""
451 raise NotImplementedError
453 async def clear(self) -> int:
454 """Clear all records from the database.
456 Returns:
457 Number of records deleted
458 """
459 raise NotImplementedError
461 async def connect(self) -> None: # noqa: B027
462 """Connect to the database. Override in subclasses if needed."""
463 # Default implementation does nothing - many backends don't need explicit connection
465 async def close(self) -> None: # noqa: B027
466 """Close the database connection. Override in subclasses if needed."""
467 # Default implementation does nothing - many backends don't need explicit closing
469 async def disconnect(self) -> None:
470 """Disconnect from the database (alias for close)."""
471 await self.close()
473 async def __aenter__(self):
474 """Async context manager entry."""
475 await self.connect()
476 return self
478 async def __aexit__(self, exc_type, exc_val, exc_tb):
479 """Async context manager exit."""
480 await self.close()
482 @abstractmethod
483 async def stream_read(
484 self,
485 query: Query | None = None,
486 config: StreamConfig | None = None
487 ) -> AsyncIterator[Record]:
488 """Stream records from database.
490 Yields records one at a time, fetching in batches internally.
492 Args:
493 query: Optional query to filter records
494 config: Streaming configuration
496 Yields:
497 Records matching the query
498 """
499 raise NotImplementedError
501 @abstractmethod
502 async def stream_write(
503 self,
504 records: AsyncIterator[Record],
505 config: StreamConfig | None = None
506 ) -> StreamResult:
507 """Stream records into database.
509 Accepts an iterator and writes in batches.
511 Args:
512 records: Iterator of records to write
513 config: Streaming configuration
515 Returns:
516 Result of the streaming operation
517 """
518 raise NotImplementedError
520 async def stream_transform(
521 self,
522 query: Query | None = None,
523 transform: Callable[[Record], Record | None] | None = None,
524 config: StreamConfig | None = None
525 ) -> AsyncIterator[Record]:
526 """Stream records through a transformation.
528 Default implementation, can be overridden for efficiency.
530 Args:
531 query: Optional query to filter records
532 transform: Optional transformation function
533 config: Streaming configuration
535 Yields:
536 Transformed records
537 """
538 async for record in self.stream_read(query, config):
539 if transform:
540 transformed = transform(record)
541 if transformed: # None means filter out
542 yield transformed
543 else:
544 yield record
546 @classmethod
547 async def from_backend(cls, backend: str, config: dict[str, Any] | None = None) -> AsyncDatabase:
548 """Factory method to create and connect a database instance.
550 Args:
551 backend: The backend type ("memory", "file", "s3", "postgres", "elasticsearch")
552 config: Backend-specific configuration
554 Returns:
555 Connected AsyncDatabase instance
556 """
557 from .backends import BACKEND_REGISTRY
559 backend_class = BACKEND_REGISTRY.get(backend)
560 if not backend_class:
561 raise ValueError(
562 f"Unknown backend: {backend}. Available: {list(BACKEND_REGISTRY.keys())}"
563 )
565 instance = backend_class(config)
566 await instance.connect()
567 return instance
570class SyncDatabase(ABC):
571 """Synchronous variant of the Database abstract base class.
573 Provides a unified synchronous interface for CRUD operations, querying, and streaming
574 across different backend databases. Supports schema validation, batch operations,
575 and complex queries with boolean logic.
577 Example:
578 ```python
579 from dataknobs_data import database_factory, Record, Query, Filter, Operator
581 # Create database
582 db = database_factory("memory")
584 # Use as context manager
585 with db:
586 # Create records
587 id1 = db.create(Record({"name": "Alice", "age": 30}))
588 id2 = db.create(Record({"name": "Bob", "age": 25}))
590 # Query records
591 query = Query(filters=[Filter("age", Operator.GT, 25)])
592 results = db.search(query)
593 print(results) # [Alice's record]
595 # Update record
596 db.update(id1, Record({"name": "Alice", "age": 31}))
598 # Stream large datasets
599 for record in db.stream_read():
600 process_record(record)
601 ```
602 """
604 def __init__(self, config: dict[str, Any] | None = None, schema: DatabaseSchema | None = None):
605 """Initialize the database with optional configuration.
607 Args:
608 config: Backend-specific configuration parameters (may include 'schema' key)
609 schema: Optional database schema (overrides config schema)
611 Example:
612 ```python
613 from dataknobs_data import SyncDatabase
614 from dataknobs_data.schema import DatabaseSchema
615 from dataknobs_data.fields import FieldType
617 # With schema
618 schema = DatabaseSchema.create(
619 name=FieldType.STRING,
620 age=FieldType.INTEGER
621 )
622 db = SyncDatabase(config={"path": "data.db"}, schema=schema)
623 ```
624 """
625 config = config or {}
627 # Extract schema from config if present and no explicit schema provided
628 if schema is None and "schema" in config:
629 schema = AsyncDatabase._extract_schema_from_config(config["schema"])
630 # Remove schema from config so backends don't see it
631 config = {k: v for k, v in config.items() if k != "schema"}
633 self.config = config
634 self.schema = schema or DatabaseSchema()
635 self._initialize()
637 def _initialize(self) -> None: # noqa: B027
638 """Initialize the database backend. Override in subclasses if needed."""
639 # Default implementation does nothing - backends can override if needed
641 def _ensure_record_id(self, record: Record, record_id: str) -> Record:
642 """Ensure a record has its ID set (delegates to utility function)."""
643 return ensure_record_id(record, record_id)
645 def _prepare_record_for_storage(self, record: Record) -> tuple[Record, str]:
646 """Prepare a record for storage by ensuring it has a storage_id.
648 Args:
649 record: The record to prepare
651 Returns:
652 Tuple of (prepared_record_copy, storage_id)
653 """
654 import uuid
655 # Make a copy to avoid modifying the original
656 record_copy = record.copy(deep=True)
658 # Generate storage ID if not present
659 if not record_copy.has_storage_id():
660 storage_id = str(uuid.uuid4())
661 record_copy.storage_id = storage_id
662 else:
663 storage_id = record_copy.storage_id
665 return record_copy, storage_id
667 def _prepare_record_from_storage(self, record: Record | None, storage_id: str) -> Record | None:
668 """Prepare a record retrieved from storage by ensuring storage_id is set.
670 Args:
671 record: The record retrieved from storage (or None)
672 storage_id: The storage ID used to retrieve the record
674 Returns:
675 Record with storage_id set, or None if record was None
676 """
677 if record:
678 record_copy = record.copy(deep=True)
679 # Ensure storage_id is set
680 if not record_copy.has_storage_id():
681 record_copy.storage_id = storage_id
682 return record_copy
683 return None
685 def _process_search_results(
686 self,
687 results: list[tuple[str, Record]],
688 query: Query,
689 deep_copy: bool = True
690 ) -> list[Record]:
691 """Process search results (delegates to utility function)."""
692 return process_search_results(results, query, deep_copy)
694 def set_schema(self, schema: DatabaseSchema) -> None:
695 """Set the database schema.
697 Args:
698 schema: The database schema to use
699 """
700 self.schema = schema
702 def add_field_schema(self, field_schema: FieldSchema) -> None:
703 """Add a field to the database schema.
705 Args:
706 field_schema: The field schema to add
707 """
708 self.schema.add_field(field_schema)
710 def with_schema(self, **field_definitions) -> SyncDatabase:
711 """Set schema using field definitions.
713 Returns self for chaining.
715 Examples:
716 db = SyncMemoryDatabase().with_schema(
717 content=FieldType.TEXT,
718 embedding=(FieldType.VECTOR, {"dimensions": 384, "source_field": "content"})
719 )
720 """
721 self.schema = DatabaseSchema.create(**field_definitions)
722 return self
724 @abstractmethod
725 def create(self, record: Record) -> str:
726 """Create a new record in the database.
728 Args:
729 record: The record to create
731 Returns:
732 The ID of the created record
734 Example:
735 ```python
736 record = Record({"name": "Alice", "age": 30})
737 record_id = db.create(record)
738 print(record_id) # "550e8400-e29b-41d4-a716-446655440000"
739 ```
740 """
741 raise NotImplementedError
743 @abstractmethod
744 def read(self, id: str) -> Record | None:
745 """Read a record by ID.
747 Args:
748 id: The record ID
750 Returns:
751 The record if found, None otherwise
753 Example:
754 ```python
755 record = db.read("550e8400-e29b-41d4-a716-446655440000")
756 if record:
757 print(record.get_value("name")) # "Alice"
758 ```
759 """
760 raise NotImplementedError
762 @abstractmethod
763 def update(self, id: str, record: Record) -> bool:
764 """Update an existing record.
766 Args:
767 id: The record ID
768 record: The updated record
770 Returns:
771 True if the record was updated, False if not found
773 Example:
774 ```python
775 updated_record = Record({"name": "Alice", "age": 31})
776 success = db.update(record_id, updated_record)
777 print(success) # True
778 ```
779 """
780 raise NotImplementedError
782 @abstractmethod
783 def delete(self, id: str) -> bool:
784 """Delete a record by ID.
786 Args:
787 id: The record ID
789 Returns:
790 True if the record was deleted, False if not found
792 Example:
793 ```python
794 success = db.delete(record_id)
795 print(success) # True
796 ```
797 """
798 raise NotImplementedError
800 @abstractmethod
801 def search(self, query: Query | ComplexQuery) -> list[Record]:
802 """Search for records matching a query (simple or complex).
804 Args:
805 query: The search query
807 Returns:
808 List of matching records
810 Example:
811 ```python
812 # Simple query
813 query = Query(filters=[Filter("age", Operator.GT, 25)])
814 results = db.search(query)
816 # Complex query with boolean logic
817 from dataknobs_data.query_logic import QueryBuilder, LogicOperator
819 complex_query = (
820 QueryBuilder()
821 .where("age", Operator.GT, 25)
822 .and_where("name", Operator.LIKE, "A%")
823 .build()
824 )
825 results = db.search(complex_query)
826 ```
827 """
828 raise NotImplementedError
830 def all(self) -> list[Record]:
831 """Get all records from the database.
833 Returns:
834 List of all records
835 """
836 # Default implementation using search with empty query
837 from .query import Query
838 return self.search(Query())
840 def _search_with_complex_query(self, query: ComplexQuery) -> list[Record]:
841 """Default implementation for ComplexQuery using in-memory filtering.
843 Backends can override this for native boolean logic support.
845 Args:
846 query: Complex query with boolean logic
848 Returns:
849 List of matching records
850 """
851 # Try to convert to simple query if possible
852 try:
853 simple_query = query.to_simple_query()
854 return self.search(simple_query)
855 except ValueError:
856 # Can't convert - need to do in-memory filtering
857 # Get all records (or use a base filter if possible)
858 all_records = self.search(Query())
860 # Apply complex condition filtering
861 results = []
862 for record in all_records:
863 if query.matches(record):
864 results.append(record)
866 # Apply sorting
867 if query.sort_specs:
868 for sort_spec in reversed(query.sort_specs):
869 reverse = sort_spec.order.value == "desc"
870 results.sort(
871 key=lambda r: r.get_value(sort_spec.field, ""),
872 reverse=reverse
873 )
875 # Apply offset and limit
876 if query.offset_value:
877 results = results[query.offset_value:]
878 if query.limit_value:
879 results = results[:query.limit_value]
881 # Apply field projection
882 if query.fields:
883 results = [r.project(query.fields) for r in results]
885 return results
887 @abstractmethod
888 def exists(self, id: str) -> bool:
889 """Check if a record exists."""
890 raise NotImplementedError
892 def upsert(self, id_or_record: str | Record, record: Record | None = None) -> str:
893 """Update or insert a record.
895 Can be called as:
896 - upsert(id, record) - explicit ID and record
897 - upsert(record) - extract ID from record using Record's built-in logic
899 Args:
900 id_or_record: Either an ID string or a Record
901 record: The record to upsert (if first arg is ID)
903 Returns:
904 The record ID
905 """
906 import uuid
908 # Determine ID and record based on arguments
909 if isinstance(id_or_record, str):
910 # Called with explicit ID: upsert(id, record)
911 id = id_or_record
912 if record is None:
913 raise ValueError("Record required when ID is provided")
914 else:
915 # Called with just record: upsert(record)
916 record = id_or_record
917 # Use Record's built-in ID property which handles all the logic
918 id = record.id
920 if id is None:
921 # Generate a new ID if none found
922 id = str(uuid.uuid4()) # type: ignore[unreachable]
923 # Set it on the record for future reference
924 record.storage_id = id
926 # Now perform the upsert
927 if self.exists(id):
928 self.update(id, record)
929 else:
930 # Ensure the record has the storage_id set for create
931 if not record.storage_id:
932 record.storage_id = id
933 created_id = self.create(record)
934 # Return the created ID (might be different from what we provided)
935 return created_id or id
936 return id
938 def create_batch(self, records: list[Record]) -> list[str]:
939 """Create multiple records in batch."""
940 ids = []
941 for record in records:
942 id = self.create(record)
943 ids.append(id)
944 return ids
946 def read_batch(self, ids: list[str]) -> list[Record | None]:
947 """Read multiple records by ID."""
948 records = []
949 for id in ids:
950 record = self.read(id)
951 records.append(record)
952 return records
954 def delete_batch(self, ids: list[str]) -> list[bool]:
955 """Delete multiple records by ID."""
956 results = []
957 for id in ids:
958 result = self.delete(id)
959 results.append(result)
960 return results
962 def update_batch(self, updates: list[tuple[str, Record]]) -> list[bool]:
963 """Update multiple records.
965 Default implementation calls update() for each ID/record pair.
966 Override for better performance.
968 Args:
969 updates: List of (id, record) tuples to update
971 Returns:
972 List of success flags for each update
973 """
974 results = []
975 for id, record in updates:
976 result = self.update(id, record)
977 results.append(result)
978 return results
980 def count(self, query: Query | None = None) -> int:
981 """Count records matching a query."""
982 if query:
983 results = self.search(query)
984 return len(results)
985 else:
986 return self._count_all()
988 @abstractmethod
989 def _count_all(self) -> int:
990 """Count all records in the database."""
991 raise NotImplementedError
993 def clear(self) -> int:
994 """Clear all records from the database."""
995 raise NotImplementedError
997 def connect(self) -> None: # noqa: B027
998 """Connect to the database. Override in subclasses if needed."""
999 # Default implementation does nothing - many backends don't need explicit connection
1001 def close(self) -> None: # noqa: B027
1002 """Close the database connection. Override in subclasses if needed."""
1003 # Default implementation does nothing - many backends don't need explicit closing
1005 def disconnect(self) -> None:
1006 """Disconnect from the database (alias for close)."""
1007 self.close()
1009 def __enter__(self):
1010 """Context manager entry."""
1011 self.connect()
1012 return self
1014 def __exit__(self, exc_type, exc_val, exc_tb):
1015 """Context manager exit."""
1016 self.close()
1018 @abstractmethod
1019 def stream_read(
1020 self,
1021 query: Query | None = None,
1022 config: StreamConfig | None = None
1023 ) -> Iterator[Record]:
1024 """Stream records from database.
1026 Yields records one at a time, fetching in batches internally.
1028 Args:
1029 query: Optional query to filter records
1030 config: Streaming configuration
1032 Yields:
1033 Records matching the query
1034 """
1035 raise NotImplementedError
1037 @abstractmethod
1038 def stream_write(
1039 self,
1040 records: Iterator[Record],
1041 config: StreamConfig | None = None
1042 ) -> StreamResult:
1043 """Stream records into database.
1045 Accepts an iterator and writes in batches.
1047 Args:
1048 records: Iterator of records to write
1049 config: Streaming configuration
1051 Returns:
1052 Result of the streaming operation
1053 """
1054 raise NotImplementedError
1056 def stream_transform(
1057 self,
1058 query: Query | None = None,
1059 transform: Callable[[Record], Record | None] | None = None,
1060 config: StreamConfig | None = None
1061 ) -> Iterator[Record]:
1062 """Stream records through a transformation.
1064 Default implementation, can be overridden for efficiency.
1066 Args:
1067 query: Optional query to filter records
1068 transform: Optional transformation function
1069 config: Streaming configuration
1071 Yields:
1072 Transformed records
1073 """
1074 for record in self.stream_read(query, config):
1075 if transform:
1076 transformed = transform(record)
1077 if transformed: # None means filter out
1078 yield transformed
1079 else:
1080 yield record
1082 @classmethod
1083 def from_backend(cls, backend: str, config: dict[str, Any] | None = None) -> SyncDatabase:
1084 """Factory method to create and connect a synchronous database instance.
1086 Args:
1087 backend: The backend type ("memory", "file", "s3", "postgres", "elasticsearch")
1088 config: Backend-specific configuration
1090 Returns:
1091 Connected SyncDatabase instance
1092 """
1093 from .backends import SYNC_BACKEND_REGISTRY
1095 backend_class = SYNC_BACKEND_REGISTRY.get(backend)
1096 if not backend_class:
1097 raise ValueError(
1098 f"Unknown backend: {backend}. Available: {list(SYNC_BACKEND_REGISTRY.keys())}"
1099 )
1101 instance = backend_class(config)
1102 instance.connect()
1103 return instance