Coverage for src / dataknobs_data / database_utils.py: 15%
27 statements
« prev ^ index » next coverage.py v7.13.0, created at 2025-12-26 15:45 -0700
« prev ^ index » next coverage.py v7.13.0, created at 2025-12-26 15:45 -0700
1"""Utility functions for database operations."""
4from .query import Query
5from .records import Record
8def ensure_record_id(record: Record, record_id: str) -> Record:
9 """Ensure a record has its storage ID set.
11 Helper method for backends to ensure records have their storage IDs set
12 when returning from storage operations like search.
14 Args:
15 record: The record to check
16 record_id: The storage ID that should be set on the record
18 Returns:
19 The record with storage_id guaranteed to be set
20 """
21 if not record.has_storage_id() or record.storage_id != record_id:
22 record = record.copy(deep=True)
23 record.storage_id = record_id
24 return record
27def process_search_results(
28 results: list[tuple[str, Record]],
29 query: Query,
30 deep_copy: bool = True
31) -> list[Record]:
32 """Process search results with standard operations.
34 Helper method for backends to process search results consistently:
35 1. Ensures records have their IDs set
36 2. Applies sorting
37 3. Applies offset and limit
38 4. Applies field projection
39 5. Returns deep copies if requested
41 Args:
42 results: List of (id, record) tuples
43 query: The query with sorting, pagination, and projection specs
44 deep_copy: Whether to return deep copies of records
46 Returns:
47 Processed list of records
48 """
49 # Apply sorting
50 if query.sort_specs:
51 for sort_spec in reversed(query.sort_specs):
52 reverse = sort_spec.order.value == "desc"
53 # Special handling for 'id' field
54 if sort_spec.field == 'id':
55 results.sort(
56 key=lambda x: x[0] or "", # x[0] is the record ID
57 reverse=reverse
58 )
59 else:
60 results.sort(
61 key=lambda x: x[1].get_value(sort_spec.field, ""),
62 reverse=reverse
63 )
65 # Extract records and ensure they have their IDs
66 records = []
67 for record_id, record in results:
68 processed_record = ensure_record_id(record, record_id)
69 if deep_copy:
70 processed_record = processed_record.copy(deep=True)
71 records.append(processed_record)
73 # Apply offset and limit
74 if query.offset_value:
75 records = records[query.offset_value:]
76 if query.limit_value:
77 records = records[:query.limit_value]
79 # Apply field projection
80 if query.fields:
81 records = [record.project(query.fields) for record in records]
83 return records