Coverage for src/dataknobs_data/migration_v2/migration.py: 68%

69 statements  

« prev     ^ index     » next       coverage.py v7.10.3, created at 2025-08-15 12:29 -0500

1""" 

2Migration definition with reversible operations. 

3""" 

4 

5from typing import List, Optional 

6 

7from dataknobs_data.records import Record 

8 

9from .operations import Operation 

10 

11 

12class Migration: 

13 """ 

14 Migration between data versions with reversible operations. 

15  

16 Provides a clean API for defining and applying migrations with 

17 support for rollback via operation reversal. 

18 """ 

19 

20 def __init__(self, from_version: str, to_version: str, description: Optional[str] = None): 

21 """ 

22 Initialize migration. 

23  

24 Args: 

25 from_version: Source version identifier 

26 to_version: Target version identifier 

27 description: Optional migration description 

28 """ 

29 self.from_version = from_version 

30 self.to_version = to_version 

31 self.description = description 

32 self.operations: List[Operation] = [] 

33 

34 def add(self, operation: Operation) -> 'Migration': 

35 """ 

36 Add an operation to the migration (fluent API). 

37  

38 Args: 

39 operation: Operation to add 

40  

41 Returns: 

42 Self for chaining 

43 """ 

44 self.operations.append(operation) 

45 return self 

46 

47 def add_many(self, operations: List[Operation]) -> 'Migration': 

48 """ 

49 Add multiple operations (fluent API). 

50  

51 Args: 

52 operations: List of operations to add 

53  

54 Returns: 

55 Self for chaining 

56 """ 

57 self.operations.extend(operations) 

58 return self 

59 

60 def apply(self, record: Record, reverse: bool = False) -> Record: 

61 """ 

62 Apply migration to a record. 

63  

64 Args: 

65 record: Record to migrate 

66 reverse: If True, apply operations in reverse 

67  

68 Returns: 

69 Migrated record 

70 """ 

71 result = record 

72 

73 if reverse: 

74 # Apply operations in reverse order with reverse method 

75 for operation in reversed(self.operations): 

76 result = operation.reverse(result) 

77 else: 

78 # Apply operations in forward order 

79 for operation in self.operations: 

80 result = operation.apply(result) 

81 

82 # Update version metadata 

83 if reverse: 

84 result.metadata["version"] = self.from_version 

85 else: 

86 result.metadata["version"] = self.to_version 

87 

88 return result 

89 

90 def apply_many(self, records: List[Record], reverse: bool = False) -> List[Record]: 

91 """ 

92 Apply migration to multiple records. 

93  

94 Args: 

95 records: List of records to migrate 

96 reverse: If True, apply operations in reverse 

97  

98 Returns: 

99 List of migrated records 

100 """ 

101 return [self.apply(record, reverse) for record in records] 

102 

103 def can_reverse(self) -> bool: 

104 """ 

105 Check if this migration can be reversed. 

106  

107 All operations must support reversal for the migration to be reversible. 

108  

109 Returns: 

110 True if migration can be reversed 

111 """ 

112 # All our operations support reversal by design 

113 return True 

114 

115 def get_affected_fields(self) -> set[str]: 

116 """ 

117 Get set of field names affected by this migration. 

118  

119 Returns: 

120 Set of field names that will be modified 

121 """ 

122 affected = set() 

123 

124 for operation in self.operations: 

125 # Extract field names based on operation type 

126 if hasattr(operation, 'field_name'): 

127 affected.add(operation.field_name) 

128 elif hasattr(operation, 'old_name'): 

129 affected.add(operation.old_name) 

130 if hasattr(operation, 'new_name'): 

131 affected.add(operation.new_name) 

132 elif hasattr(operation, 'operations'): 

133 # Composite operation - recursively get affected fields 

134 for sub_op in operation.operations: 

135 if hasattr(sub_op, 'field_name'): 

136 affected.add(sub_op.field_name) 

137 elif hasattr(sub_op, 'old_name'): 

138 affected.add(sub_op.old_name) 

139 if hasattr(sub_op, 'new_name'): 

140 affected.add(sub_op.new_name) 

141 

142 return affected 

143 

144 def validate(self, record: Record) -> tuple[bool, List[str]]: 

145 """ 

146 Validate if a record can be migrated. 

147  

148 Args: 

149 record: Record to validate 

150  

151 Returns: 

152 Tuple of (is_valid, list_of_issues) 

153 """ 

154 issues = [] 

155 

156 # Check current version 

157 current_version = record.metadata.get("version") 

158 if current_version and current_version != self.from_version: 

159 issues.append( 

160 f"Record version mismatch: expected {self.from_version}, got {current_version}" 

161 ) 

162 

163 # Check for required fields for operations 

164 for operation in self.operations: 

165 if hasattr(operation, 'old_name'): 

166 # RenameField operation 

167 if operation.old_name not in record.fields: 

168 issues.append(f"Field '{operation.old_name}' not found for rename operation") 

169 elif hasattr(operation, 'field_name') and operation.__class__.__name__ == 'TransformField': 

170 # TransformField operation 

171 if operation.field_name not in record.fields: 

172 issues.append(f"Field '{operation.field_name}' not found for transform operation") 

173 

174 return len(issues) == 0, issues 

175 

176 def __repr__(self) -> str: 

177 """String representation.""" 

178 return ( 

179 f"Migration(from='{self.from_version}', to='{self.to_version}', " 

180 f"operations={len(self.operations)})" 

181 ) 

182 

183 def __str__(self) -> str: 

184 """Human-readable string.""" 

185 desc = f"Migration from {self.from_version} to {self.to_version}" 

186 if self.description: 

187 desc += f": {self.description}" 

188 desc += f"\n Operations ({len(self.operations)}):" 

189 for i, op in enumerate(self.operations, 1): 

190 desc += f"\n {i}. {op}" 

191 return desc