Coverage for src / dataknobs_data / migration / transformer.py: 39%
84 statements
« prev ^ index » next coverage.py v7.13.0, created at 2025-12-26 15:45 -0700
« prev ^ index » next coverage.py v7.13.0, created at 2025-12-26 15:45 -0700
1"""Data transformation with fluent API.
2"""
4from __future__ import annotations
6from abc import ABC, abstractmethod
7from dataclasses import dataclass
8from typing import Any, TYPE_CHECKING
10from dataknobs_data.records import Record
12if TYPE_CHECKING:
13 from collections.abc import Callable
14 from dataknobs_data.fields import FieldType
17class TransformRule(ABC):
18 """Base class for transformation rules."""
20 @abstractmethod
21 def apply(self, record: Record) -> Record:
22 """Apply this transformation rule to a record.
24 Args:
25 record: Record to transform
27 Returns:
28 Transformed record
29 """
30 pass
33@dataclass
34class MapRule(TransformRule):
35 """Map a field to another field, optionally transforming the value."""
37 source: str
38 target: str
39 transform: Callable[[Any], Any] | None = None
41 def apply(self, record: Record) -> Record:
42 """Apply field mapping."""
43 result = Record(
44 data=dict(record.fields),
45 metadata=record.metadata.copy(),
46 id=record.id
47 )
49 if self.source in record.fields:
50 value = record.fields[self.source].value
52 # Apply transformation if provided
53 if self.transform:
54 try:
55 value = self.transform(value)
56 except Exception as e:
57 # Store error in metadata and keep original value
58 result.metadata[f"_transform_error_{self.source}"] = str(e)
59 value = record.fields[self.source].value
61 # If target is different from source, remove source field
62 if self.target != self.source:
63 del result.fields[self.source]
65 # Set target field
66 result.set_field(self.target, value)
68 return result
71@dataclass
72class ExcludeRule(TransformRule):
73 """Exclude specified fields from the record."""
75 fields: list[str]
77 def apply(self, record: Record) -> Record:
78 """Remove excluded fields."""
79 result = Record(
80 data={},
81 metadata=record.metadata.copy(),
82 id=record.id
83 )
85 # Copy all fields except excluded ones
86 for field_name, field in record.fields.items():
87 if field_name not in self.fields:
88 result.fields[field_name] = field
90 return result
93@dataclass
94class AddRule(TransformRule):
95 """Add a new field with a computed or default value."""
97 field_name: str
98 value: Any | Callable[[Record], Any]
99 field_type: FieldType | None = None
101 def apply(self, record: Record) -> Record:
102 """Add new field."""
103 result = Record(
104 data=dict(record.fields),
105 metadata=record.metadata.copy(),
106 id=record.id
107 )
109 # Compute value if it's a callable
110 if callable(self.value):
111 try:
112 computed_value = self.value(record)
113 except Exception as e:
114 # Store error and use None as value
115 result.metadata[f"_compute_error_{self.field_name}"] = str(e)
116 computed_value = None
117 else:
118 computed_value = self.value
120 result.set_field(self.field_name, computed_value, field_type=self.field_type)
121 return result
124class Transformer:
125 """Stateless record transformer with fluent API.
127 Provides a clean, chainable interface for defining record transformations
128 that can be applied during migrations or data processing.
129 """
131 def __init__(self):
132 """Initialize transformer with empty rule set."""
133 self.rules: list[TransformRule] = []
135 def map(
136 self,
137 source: str,
138 target: str | None = None,
139 transform: Callable[[Any], Any] | None = None
140 ) -> Transformer:
141 """Map a field, optionally transforming its value (fluent API).
143 Args:
144 source: Source field name
145 target: Target field name (defaults to source)
146 transform: Optional transformation function
148 Returns:
149 Self for chaining
150 """
151 self.rules.append(MapRule(
152 source=source,
153 target=target or source,
154 transform=transform
155 ))
156 return self
158 def rename(self, old_name: str, new_name: str) -> Transformer:
159 """Rename a field (fluent API).
161 Args:
162 old_name: Current field name
163 new_name: New field name
165 Returns:
166 Self for chaining
167 """
168 return self.map(old_name, new_name)
170 def exclude(self, *fields: str) -> Transformer:
171 """Exclude fields from the record (fluent API).
173 Args:
174 *fields: Field names to exclude
176 Returns:
177 Self for chaining
178 """
179 self.rules.append(ExcludeRule(list(fields)))
180 return self
182 def add(
183 self,
184 field_name: str,
185 value: Any | Callable[[Record], Any],
186 field_type: FieldType | None = None
187 ) -> Transformer:
188 """Add a new field (fluent API).
190 Args:
191 field_name: Name of field to add
192 value: Static value or function to compute value
193 field_type: Optional field type
195 Returns:
196 Self for chaining
197 """
198 self.rules.append(AddRule(
199 field_name=field_name,
200 value=value,
201 field_type=field_type
202 ))
203 return self
205 def add_rule(self, rule: TransformRule) -> Transformer:
206 """Add a custom transformation rule (fluent API).
208 Args:
209 rule: Custom transformation rule
211 Returns:
212 Self for chaining
213 """
214 self.rules.append(rule)
215 return self
217 def transform(self, record: Record) -> Record | None:
218 """Apply all transformation rules to a record.
220 Args:
221 record: Record to transform
223 Returns:
224 Transformed record, or None to filter out the record
225 """
226 result = record
227 for rule in self.rules:
228 result = rule.apply(result)
230 return result
232 def transform_many(self, records: list[Record]) -> list[Record]:
233 """Transform multiple records.
235 Args:
236 records: List of records to transform
238 Returns:
239 List of transformed records (filtered records are excluded)
240 """
241 results = []
242 for record in records:
243 transformed = self.transform(record)
244 if transformed is not None:
245 results.append(transformed)
246 return results
248 def clear(self) -> Transformer:
249 """Clear all transformation rules (fluent API).
251 Returns:
252 Self for chaining
253 """
254 self.rules.clear()
255 return self
257 def __len__(self) -> int:
258 """Get number of transformation rules."""
259 return len(self.rules)
261 def __repr__(self) -> str:
262 """String representation."""
263 return f"Transformer(rules={len(self.rules)})"