Coverage for src / dataknobs_data / database_utils.py: 15%

27 statements  

« prev     ^ index     » next       coverage.py v7.13.0, created at 2025-12-26 15:45 -0700

1"""Utility functions for database operations.""" 

2 

3 

4from .query import Query 

5from .records import Record 

6 

7 

8def ensure_record_id(record: Record, record_id: str) -> Record: 

9 """Ensure a record has its storage ID set. 

10  

11 Helper method for backends to ensure records have their storage IDs set 

12 when returning from storage operations like search. 

13  

14 Args: 

15 record: The record to check 

16 record_id: The storage ID that should be set on the record 

17  

18 Returns: 

19 The record with storage_id guaranteed to be set 

20 """ 

21 if not record.has_storage_id() or record.storage_id != record_id: 

22 record = record.copy(deep=True) 

23 record.storage_id = record_id 

24 return record 

25 

26 

27def process_search_results( 

28 results: list[tuple[str, Record]], 

29 query: Query, 

30 deep_copy: bool = True 

31) -> list[Record]: 

32 """Process search results with standard operations. 

33  

34 Helper method for backends to process search results consistently: 

35 1. Ensures records have their IDs set 

36 2. Applies sorting 

37 3. Applies offset and limit 

38 4. Applies field projection 

39 5. Returns deep copies if requested 

40  

41 Args: 

42 results: List of (id, record) tuples 

43 query: The query with sorting, pagination, and projection specs 

44 deep_copy: Whether to return deep copies of records 

45  

46 Returns: 

47 Processed list of records 

48 """ 

49 # Apply sorting 

50 if query.sort_specs: 

51 for sort_spec in reversed(query.sort_specs): 

52 reverse = sort_spec.order.value == "desc" 

53 # Special handling for 'id' field 

54 if sort_spec.field == 'id': 

55 results.sort( 

56 key=lambda x: x[0] or "", # x[0] is the record ID 

57 reverse=reverse 

58 ) 

59 else: 

60 results.sort( 

61 key=lambda x: x[1].get_value(sort_spec.field, ""), 

62 reverse=reverse 

63 ) 

64 

65 # Extract records and ensure they have their IDs 

66 records = [] 

67 for record_id, record in results: 

68 processed_record = ensure_record_id(record, record_id) 

69 if deep_copy: 

70 processed_record = processed_record.copy(deep=True) 

71 records.append(processed_record) 

72 

73 # Apply offset and limit 

74 if query.offset_value: 

75 records = records[query.offset_value:] 

76 if query.limit_value: 

77 records = records[:query.limit_value] 

78 

79 # Apply field projection 

80 if query.fields: 

81 records = [record.project(query.fields) for record in records] 

82 

83 return records