Coverage for src / dataknobs_data / migration / migration.py: 18%
68 statements
« prev ^ index » next coverage.py v7.13.0, created at 2025-12-26 15:45 -0700
« prev ^ index » next coverage.py v7.13.0, created at 2025-12-26 15:45 -0700
1"""Migration definition with reversible operations.
2"""
4from __future__ import annotations
5from typing import TYPE_CHECKING
7if TYPE_CHECKING:
8 from dataknobs_data.records import Record
9 from .operations import Operation
12class Migration:
13 """Migration between data versions with reversible operations.
15 Provides a clean API for defining and applying migrations with
16 support for rollback via operation reversal.
17 """
19 def __init__(self, from_version: str, to_version: str, description: str | None = None):
20 """Initialize migration.
22 Args:
23 from_version: Source version identifier
24 to_version: Target version identifier
25 description: Optional migration description
26 """
27 self.from_version = from_version
28 self.to_version = to_version
29 self.description = description
30 self.operations: list[Operation] = []
32 def add(self, operation: Operation) -> Migration:
33 """Add an operation to the migration (fluent API).
35 Args:
36 operation: Operation to add
38 Returns:
39 Self for chaining
40 """
41 self.operations.append(operation)
42 return self
44 def add_many(self, operations: list[Operation]) -> Migration:
45 """Add multiple operations (fluent API).
47 Args:
48 operations: List of operations to add
50 Returns:
51 Self for chaining
52 """
53 self.operations.extend(operations)
54 return self
56 def apply(self, record: Record, reverse: bool = False) -> Record:
57 """Apply migration to a record.
59 Args:
60 record: Record to migrate
61 reverse: If True, apply operations in reverse
63 Returns:
64 Migrated record
65 """
66 result = record
68 if reverse:
69 # Apply operations in reverse order with reverse method
70 for operation in reversed(self.operations):
71 result = operation.reverse(result)
72 else:
73 # Apply operations in forward order
74 for operation in self.operations:
75 result = operation.apply(result)
77 # Update version metadata
78 if reverse:
79 result.metadata["version"] = self.from_version
80 else:
81 result.metadata["version"] = self.to_version
83 return result
85 def apply_many(self, records: list[Record], reverse: bool = False) -> list[Record]:
86 """Apply migration to multiple records.
88 Args:
89 records: List of records to migrate
90 reverse: If True, apply operations in reverse
92 Returns:
93 List of migrated records
94 """
95 return [self.apply(record, reverse) for record in records]
97 def can_reverse(self) -> bool:
98 """Check if this migration can be reversed.
100 All operations must support reversal for the migration to be reversible.
102 Returns:
103 True if migration can be reversed
104 """
105 # All our operations support reversal by design
106 return True
108 def get_affected_fields(self) -> set[str]:
109 """Get set of field names affected by this migration.
111 Returns:
112 Set of field names that will be modified
113 """
114 affected = set()
116 for operation in self.operations:
117 # Extract field names based on operation type
118 if hasattr(operation, 'field_name'):
119 affected.add(operation.field_name)
120 elif hasattr(operation, 'old_name'):
121 affected.add(operation.old_name)
122 if hasattr(operation, 'new_name'):
123 affected.add(operation.new_name)
124 elif hasattr(operation, 'operations'):
125 # Composite operation - recursively get affected fields
126 for sub_op in operation.operations:
127 if hasattr(sub_op, 'field_name'):
128 affected.add(sub_op.field_name)
129 elif hasattr(sub_op, 'old_name'):
130 affected.add(sub_op.old_name)
131 if hasattr(sub_op, 'new_name'):
132 affected.add(sub_op.new_name)
134 return affected
136 def validate(self, record: Record) -> tuple[bool, list[str]]:
137 """Validate if a record can be migrated.
139 Args:
140 record: Record to validate
142 Returns:
143 Tuple of (is_valid, list_of_issues)
144 """
145 issues = []
147 # Check current version
148 current_version = record.metadata.get("version")
149 if current_version and current_version != self.from_version:
150 issues.append(
151 f"Record version mismatch: expected {self.from_version}, got {current_version}"
152 )
154 # Check for required fields for operations
155 for operation in self.operations:
156 if hasattr(operation, 'old_name'):
157 # RenameField operation
158 if operation.old_name not in record.fields:
159 issues.append(f"Field '{operation.old_name}' not found for rename operation")
160 elif hasattr(operation, 'field_name') and operation.__class__.__name__ == 'TransformField':
161 # TransformField operation
162 if operation.field_name not in record.fields:
163 issues.append(f"Field '{operation.field_name}' not found for transform operation")
165 return len(issues) == 0, issues
167 def __repr__(self) -> str:
168 """String representation."""
169 return (
170 f"Migration(from='{self.from_version}', to='{self.to_version}', "
171 f"operations={len(self.operations)})"
172 )
174 def __str__(self) -> str:
175 """Human-readable string."""
176 desc = f"Migration from {self.from_version} to {self.to_version}"
177 if self.description:
178 desc += f": {self.description}"
179 desc += f"\n Operations ({len(self.operations)}):"
180 for i, op in enumerate(self.operations, 1):
181 desc += f"\n {i}. {op}"
182 return desc