Coverage for src/dataknobs_data/migration/migration.py: 18%

68 statements  

« prev     ^ index     » next       coverage.py v7.10.3, created at 2025-08-31 15:06 -0600

1"""Migration definition with reversible operations. 

2""" 

3 

4from __future__ import annotations 

5from typing import TYPE_CHECKING 

6 

7if TYPE_CHECKING: 

8 from dataknobs_data.records import Record 

9 from .operations import Operation 

10 

11 

12class Migration: 

13 """Migration between data versions with reversible operations. 

14  

15 Provides a clean API for defining and applying migrations with 

16 support for rollback via operation reversal. 

17 """ 

18 

19 def __init__(self, from_version: str, to_version: str, description: str | None = None): 

20 """Initialize migration. 

21  

22 Args: 

23 from_version: Source version identifier 

24 to_version: Target version identifier 

25 description: Optional migration description 

26 """ 

27 self.from_version = from_version 

28 self.to_version = to_version 

29 self.description = description 

30 self.operations: list[Operation] = [] 

31 

32 def add(self, operation: Operation) -> Migration: 

33 """Add an operation to the migration (fluent API). 

34  

35 Args: 

36 operation: Operation to add 

37  

38 Returns: 

39 Self for chaining 

40 """ 

41 self.operations.append(operation) 

42 return self 

43 

44 def add_many(self, operations: list[Operation]) -> Migration: 

45 """Add multiple operations (fluent API). 

46  

47 Args: 

48 operations: List of operations to add 

49  

50 Returns: 

51 Self for chaining 

52 """ 

53 self.operations.extend(operations) 

54 return self 

55 

56 def apply(self, record: Record, reverse: bool = False) -> Record: 

57 """Apply migration to a record. 

58  

59 Args: 

60 record: Record to migrate 

61 reverse: If True, apply operations in reverse 

62  

63 Returns: 

64 Migrated record 

65 """ 

66 result = record 

67 

68 if reverse: 

69 # Apply operations in reverse order with reverse method 

70 for operation in reversed(self.operations): 

71 result = operation.reverse(result) 

72 else: 

73 # Apply operations in forward order 

74 for operation in self.operations: 

75 result = operation.apply(result) 

76 

77 # Update version metadata 

78 if reverse: 

79 result.metadata["version"] = self.from_version 

80 else: 

81 result.metadata["version"] = self.to_version 

82 

83 return result 

84 

85 def apply_many(self, records: list[Record], reverse: bool = False) -> list[Record]: 

86 """Apply migration to multiple records. 

87  

88 Args: 

89 records: List of records to migrate 

90 reverse: If True, apply operations in reverse 

91  

92 Returns: 

93 List of migrated records 

94 """ 

95 return [self.apply(record, reverse) for record in records] 

96 

97 def can_reverse(self) -> bool: 

98 """Check if this migration can be reversed. 

99  

100 All operations must support reversal for the migration to be reversible. 

101  

102 Returns: 

103 True if migration can be reversed 

104 """ 

105 # All our operations support reversal by design 

106 return True 

107 

108 def get_affected_fields(self) -> set[str]: 

109 """Get set of field names affected by this migration. 

110  

111 Returns: 

112 Set of field names that will be modified 

113 """ 

114 affected = set() 

115 

116 for operation in self.operations: 

117 # Extract field names based on operation type 

118 if hasattr(operation, 'field_name'): 

119 affected.add(operation.field_name) 

120 elif hasattr(operation, 'old_name'): 

121 affected.add(operation.old_name) 

122 if hasattr(operation, 'new_name'): 

123 affected.add(operation.new_name) 

124 elif hasattr(operation, 'operations'): 

125 # Composite operation - recursively get affected fields 

126 for sub_op in operation.operations: 

127 if hasattr(sub_op, 'field_name'): 

128 affected.add(sub_op.field_name) 

129 elif hasattr(sub_op, 'old_name'): 

130 affected.add(sub_op.old_name) 

131 if hasattr(sub_op, 'new_name'): 

132 affected.add(sub_op.new_name) 

133 

134 return affected 

135 

136 def validate(self, record: Record) -> tuple[bool, list[str]]: 

137 """Validate if a record can be migrated. 

138  

139 Args: 

140 record: Record to validate 

141  

142 Returns: 

143 Tuple of (is_valid, list_of_issues) 

144 """ 

145 issues = [] 

146 

147 # Check current version 

148 current_version = record.metadata.get("version") 

149 if current_version and current_version != self.from_version: 

150 issues.append( 

151 f"Record version mismatch: expected {self.from_version}, got {current_version}" 

152 ) 

153 

154 # Check for required fields for operations 

155 for operation in self.operations: 

156 if hasattr(operation, 'old_name'): 

157 # RenameField operation 

158 if operation.old_name not in record.fields: 

159 issues.append(f"Field '{operation.old_name}' not found for rename operation") 

160 elif hasattr(operation, 'field_name') and operation.__class__.__name__ == 'TransformField': 

161 # TransformField operation 

162 if operation.field_name not in record.fields: 

163 issues.append(f"Field '{operation.field_name}' not found for transform operation") 

164 

165 return len(issues) == 0, issues 

166 

167 def __repr__(self) -> str: 

168 """String representation.""" 

169 return ( 

170 f"Migration(from='{self.from_version}', to='{self.to_version}', " 

171 f"operations={len(self.operations)})" 

172 ) 

173 

174 def __str__(self) -> str: 

175 """Human-readable string.""" 

176 desc = f"Migration from {self.from_version} to {self.to_version}" 

177 if self.description: 

178 desc += f": {self.description}" 

179 desc += f"\n Operations ({len(self.operations)}):" 

180 for i, op in enumerate(self.operations, 1): 

181 desc += f"\n {i}. {op}" 

182 return desc