Coverage for src/dataknobs_data/migration_v2/transformer.py: 90%
88 statements
« prev ^ index » next coverage.py v7.10.3, created at 2025-08-15 12:29 -0500
« prev ^ index » next coverage.py v7.10.3, created at 2025-08-15 12:29 -0500
1"""
2Data transformation with fluent API.
3"""
5from abc import ABC, abstractmethod
6from dataclasses import dataclass
7from typing import Any, Callable, List, Optional, Union
9from dataknobs_data.records import Record
10from dataknobs_data.fields import FieldType
13class TransformRule(ABC):
14 """Base class for transformation rules."""
16 @abstractmethod
17 def apply(self, record: Record) -> Record:
18 """
19 Apply this transformation rule to a record.
21 Args:
22 record: Record to transform
24 Returns:
25 Transformed record
26 """
27 pass
30@dataclass
31class MapRule(TransformRule):
32 """Map a field to another field, optionally transforming the value."""
34 source: str
35 target: str
36 transform: Optional[Callable[[Any], Any]] = None
38 def apply(self, record: Record) -> Record:
39 """Apply field mapping."""
40 result = Record(
41 data=dict(record.fields),
42 metadata=record.metadata.copy(),
43 id=record.id
44 )
46 if self.source in record.fields:
47 value = record.fields[self.source].value
49 # Apply transformation if provided
50 if self.transform:
51 try:
52 value = self.transform(value)
53 except Exception as e:
54 # Store error in metadata and keep original value
55 result.metadata[f"_transform_error_{self.source}"] = str(e)
56 value = record.fields[self.source].value
58 # If target is different from source, remove source field
59 if self.target != self.source:
60 del result.fields[self.source]
62 # Set target field
63 result.set_field(self.target, value)
65 return result
68@dataclass
69class ExcludeRule(TransformRule):
70 """Exclude specified fields from the record."""
72 fields: List[str]
74 def apply(self, record: Record) -> Record:
75 """Remove excluded fields."""
76 result = Record(
77 data={},
78 metadata=record.metadata.copy(),
79 id=record.id
80 )
82 # Copy all fields except excluded ones
83 for field_name, field in record.fields.items():
84 if field_name not in self.fields:
85 result.fields[field_name] = field
87 return result
90@dataclass
91class AddRule(TransformRule):
92 """Add a new field with a computed or default value."""
94 field_name: str
95 value: Union[Any, Callable[[Record], Any]]
96 field_type: Optional[FieldType] = None
98 def apply(self, record: Record) -> Record:
99 """Add new field."""
100 result = Record(
101 data=dict(record.fields),
102 metadata=record.metadata.copy(),
103 id=record.id
104 )
106 # Compute value if it's a callable
107 if callable(self.value):
108 try:
109 computed_value = self.value(record)
110 except Exception as e:
111 # Store error and use None as value
112 result.metadata[f"_compute_error_{self.field_name}"] = str(e)
113 computed_value = None
114 else:
115 computed_value = self.value
117 result.set_field(self.field_name, computed_value, field_type=self.field_type)
118 return result
121class Transformer:
122 """
123 Stateless record transformer with fluent API.
125 Provides a clean, chainable interface for defining record transformations
126 that can be applied during migrations or data processing.
127 """
129 def __init__(self):
130 """Initialize transformer with empty rule set."""
131 self.rules: List[TransformRule] = []
133 def map(
134 self,
135 source: str,
136 target: Optional[str] = None,
137 transform: Optional[Callable[[Any], Any]] = None
138 ) -> 'Transformer':
139 """
140 Map a field, optionally transforming its value (fluent API).
142 Args:
143 source: Source field name
144 target: Target field name (defaults to source)
145 transform: Optional transformation function
147 Returns:
148 Self for chaining
149 """
150 self.rules.append(MapRule(
151 source=source,
152 target=target or source,
153 transform=transform
154 ))
155 return self
157 def rename(self, old_name: str, new_name: str) -> 'Transformer':
158 """
159 Rename a field (fluent API).
161 Args:
162 old_name: Current field name
163 new_name: New field name
165 Returns:
166 Self for chaining
167 """
168 return self.map(old_name, new_name)
170 def exclude(self, *fields: str) -> 'Transformer':
171 """
172 Exclude fields from the record (fluent API).
174 Args:
175 *fields: Field names to exclude
177 Returns:
178 Self for chaining
179 """
180 self.rules.append(ExcludeRule(list(fields)))
181 return self
183 def add(
184 self,
185 field_name: str,
186 value: Union[Any, Callable[[Record], Any]],
187 field_type: Optional[FieldType] = None
188 ) -> 'Transformer':
189 """
190 Add a new field (fluent API).
192 Args:
193 field_name: Name of field to add
194 value: Static value or function to compute value
195 field_type: Optional field type
197 Returns:
198 Self for chaining
199 """
200 self.rules.append(AddRule(
201 field_name=field_name,
202 value=value,
203 field_type=field_type
204 ))
205 return self
207 def add_rule(self, rule: TransformRule) -> 'Transformer':
208 """
209 Add a custom transformation rule (fluent API).
211 Args:
212 rule: Custom transformation rule
214 Returns:
215 Self for chaining
216 """
217 self.rules.append(rule)
218 return self
220 def transform(self, record: Record) -> Optional[Record]:
221 """
222 Apply all transformation rules to a record.
224 Args:
225 record: Record to transform
227 Returns:
228 Transformed record, or None if record should be filtered out
229 """
230 if record is None:
231 return None
233 result = record
234 for rule in self.rules:
235 result = rule.apply(result)
236 if result is None:
237 # Rule filtered out the record
238 return None
240 return result
242 def transform_many(self, records: List[Record]) -> List[Record]:
243 """
244 Transform multiple records.
246 Args:
247 records: List of records to transform
249 Returns:
250 List of transformed records (filtered records excluded)
251 """
252 results = []
253 for record in records:
254 transformed = self.transform(record)
255 if transformed is not None:
256 results.append(transformed)
257 return results
259 def clear(self) -> 'Transformer':
260 """
261 Clear all transformation rules (fluent API).
263 Returns:
264 Self for chaining
265 """
266 self.rules.clear()
267 return self
269 def __len__(self) -> int:
270 """Get number of transformation rules."""
271 return len(self.rules)
273 def __repr__(self) -> str:
274 """String representation."""
275 return f"Transformer(rules={len(self.rules)})"