Coverage for src/dataknobs_data/migration_v2/migration.py: 68%
69 statements
« prev ^ index » next coverage.py v7.10.3, created at 2025-08-15 12:29 -0500
« prev ^ index » next coverage.py v7.10.3, created at 2025-08-15 12:29 -0500
1"""
2Migration definition with reversible operations.
3"""
5from typing import List, Optional
7from dataknobs_data.records import Record
9from .operations import Operation
12class Migration:
13 """
14 Migration between data versions with reversible operations.
16 Provides a clean API for defining and applying migrations with
17 support for rollback via operation reversal.
18 """
20 def __init__(self, from_version: str, to_version: str, description: Optional[str] = None):
21 """
22 Initialize migration.
24 Args:
25 from_version: Source version identifier
26 to_version: Target version identifier
27 description: Optional migration description
28 """
29 self.from_version = from_version
30 self.to_version = to_version
31 self.description = description
32 self.operations: List[Operation] = []
34 def add(self, operation: Operation) -> 'Migration':
35 """
36 Add an operation to the migration (fluent API).
38 Args:
39 operation: Operation to add
41 Returns:
42 Self for chaining
43 """
44 self.operations.append(operation)
45 return self
47 def add_many(self, operations: List[Operation]) -> 'Migration':
48 """
49 Add multiple operations (fluent API).
51 Args:
52 operations: List of operations to add
54 Returns:
55 Self for chaining
56 """
57 self.operations.extend(operations)
58 return self
60 def apply(self, record: Record, reverse: bool = False) -> Record:
61 """
62 Apply migration to a record.
64 Args:
65 record: Record to migrate
66 reverse: If True, apply operations in reverse
68 Returns:
69 Migrated record
70 """
71 result = record
73 if reverse:
74 # Apply operations in reverse order with reverse method
75 for operation in reversed(self.operations):
76 result = operation.reverse(result)
77 else:
78 # Apply operations in forward order
79 for operation in self.operations:
80 result = operation.apply(result)
82 # Update version metadata
83 if reverse:
84 result.metadata["version"] = self.from_version
85 else:
86 result.metadata["version"] = self.to_version
88 return result
90 def apply_many(self, records: List[Record], reverse: bool = False) -> List[Record]:
91 """
92 Apply migration to multiple records.
94 Args:
95 records: List of records to migrate
96 reverse: If True, apply operations in reverse
98 Returns:
99 List of migrated records
100 """
101 return [self.apply(record, reverse) for record in records]
103 def can_reverse(self) -> bool:
104 """
105 Check if this migration can be reversed.
107 All operations must support reversal for the migration to be reversible.
109 Returns:
110 True if migration can be reversed
111 """
112 # All our operations support reversal by design
113 return True
115 def get_affected_fields(self) -> set[str]:
116 """
117 Get set of field names affected by this migration.
119 Returns:
120 Set of field names that will be modified
121 """
122 affected = set()
124 for operation in self.operations:
125 # Extract field names based on operation type
126 if hasattr(operation, 'field_name'):
127 affected.add(operation.field_name)
128 elif hasattr(operation, 'old_name'):
129 affected.add(operation.old_name)
130 if hasattr(operation, 'new_name'):
131 affected.add(operation.new_name)
132 elif hasattr(operation, 'operations'):
133 # Composite operation - recursively get affected fields
134 for sub_op in operation.operations:
135 if hasattr(sub_op, 'field_name'):
136 affected.add(sub_op.field_name)
137 elif hasattr(sub_op, 'old_name'):
138 affected.add(sub_op.old_name)
139 if hasattr(sub_op, 'new_name'):
140 affected.add(sub_op.new_name)
142 return affected
144 def validate(self, record: Record) -> tuple[bool, List[str]]:
145 """
146 Validate if a record can be migrated.
148 Args:
149 record: Record to validate
151 Returns:
152 Tuple of (is_valid, list_of_issues)
153 """
154 issues = []
156 # Check current version
157 current_version = record.metadata.get("version")
158 if current_version and current_version != self.from_version:
159 issues.append(
160 f"Record version mismatch: expected {self.from_version}, got {current_version}"
161 )
163 # Check for required fields for operations
164 for operation in self.operations:
165 if hasattr(operation, 'old_name'):
166 # RenameField operation
167 if operation.old_name not in record.fields:
168 issues.append(f"Field '{operation.old_name}' not found for rename operation")
169 elif hasattr(operation, 'field_name') and operation.__class__.__name__ == 'TransformField':
170 # TransformField operation
171 if operation.field_name not in record.fields:
172 issues.append(f"Field '{operation.field_name}' not found for transform operation")
174 return len(issues) == 0, issues
176 def __repr__(self) -> str:
177 """String representation."""
178 return (
179 f"Migration(from='{self.from_version}', to='{self.to_version}', "
180 f"operations={len(self.operations)})"
181 )
183 def __str__(self) -> str:
184 """Human-readable string."""
185 desc = f"Migration from {self.from_version} to {self.to_version}"
186 if self.description:
187 desc += f": {self.description}"
188 desc += f"\n Operations ({len(self.operations)}):"
189 for i, op in enumerate(self.operations, 1):
190 desc += f"\n {i}. {op}"
191 return desc