Coverage for src/dataknobs_data/migration/transformers.py: 0%
167 statements
« prev ^ index » next coverage.py v7.10.3, created at 2025-08-14 18:11 -0500
« prev ^ index » next coverage.py v7.10.3, created at 2025-08-14 18:11 -0500
1"""Data transformation utilities for migrations."""
3import logging
4from dataclasses import dataclass
5from typing import Any, Callable, Dict, List, Optional, Union
7from dataknobs_data.fields import Field
8from dataknobs_data.records import Record
10logger = logging.getLogger(__name__)
13@dataclass
14class FieldMapping:
15 """Mapping between source and target fields."""
16 source_field: str
17 target_field: str
18 transformer: Optional[Callable[[Any], Any]] = None
19 default_value: Any = None
21 def apply(self, source_value: Any) -> Any:
22 """Apply transformation to field value."""
23 if source_value is None and self.default_value is not None:
24 return self.default_value
26 if self.transformer:
27 try:
28 return self.transformer(source_value)
29 except Exception as e:
30 logger.warning(f"Transformation failed for {self.source_field}: {e}")
31 return self.default_value
33 return source_value
36class ValueTransformer:
37 """Common value transformation functions."""
39 @staticmethod
40 def to_string(value: Any) -> str:
41 """Convert any value to string."""
42 if value is None:
43 return ""
44 return str(value)
46 @staticmethod
47 def to_int(value: Any) -> Optional[int]:
48 """Convert value to integer."""
49 if value is None:
50 return None
51 try:
52 if isinstance(value, str):
53 # Handle string representations
54 value = value.strip()
55 if value == "":
56 return None
57 return int(float(value)) # Handle floats
58 except (ValueError, TypeError):
59 return None
61 @staticmethod
62 def to_float(value: Any) -> Optional[float]:
63 """Convert value to float."""
64 if value is None:
65 return None
66 try:
67 return float(value)
68 except (ValueError, TypeError):
69 return None
71 @staticmethod
72 def to_bool(value: Any) -> bool:
73 """Convert value to boolean."""
74 if isinstance(value, bool):
75 return value
76 if isinstance(value, str):
77 return value.lower() in ('true', 'yes', '1', 'on')
78 return bool(value)
80 @staticmethod
81 def parse_json(value: str) -> Any:
82 """Parse JSON string."""
83 import json
84 try:
85 return json.loads(value)
86 except (json.JSONDecodeError, TypeError):
87 return value
89 @staticmethod
90 def to_json(value: Any) -> str:
91 """Convert value to JSON string."""
92 import json
93 try:
94 return json.dumps(value)
95 except (TypeError, ValueError):
96 return str(value)
98 @staticmethod
99 def normalize_string(value: str) -> str:
100 """Normalize string (lowercase, strip whitespace)."""
101 if not isinstance(value, str):
102 value = str(value)
103 return value.lower().strip()
105 @staticmethod
106 def truncate(max_length: int) -> Callable[[str], str]:
107 """Create a truncation transformer."""
108 def truncator(value: str) -> str:
109 if not isinstance(value, str):
110 value = str(value)
111 return value[:max_length]
112 return truncator
114 @staticmethod
115 def regex_extract(pattern: str, group: int = 0) -> Callable[[str], Optional[str]]:
116 """Create a regex extraction transformer."""
117 import re
118 compiled_pattern = re.compile(pattern)
120 def extractor(value: str) -> Optional[str]:
121 if not isinstance(value, str):
122 value = str(value)
123 match = compiled_pattern.search(value)
124 if match:
125 return match.group(group)
126 return None
128 return extractor
130 @staticmethod
131 def map_values(mapping: Dict[Any, Any], default: Any = None) -> Callable[[Any], Any]:
132 """Create a value mapping transformer."""
133 def mapper(value: Any) -> Any:
134 return mapping.get(value, default)
135 return mapper
137 @staticmethod
138 def chain(*transformers: Callable[[Any], Any]) -> Callable[[Any], Any]:
139 """Chain multiple transformers."""
140 def chained(value: Any) -> Any:
141 for transformer in transformers:
142 value = transformer(value)
143 return value
144 return chained
147class DataTransformer:
148 """Transform records during migration."""
150 def __init__(self):
151 """Initialize data transformer."""
152 self.field_mappings: List[FieldMapping] = []
153 self.record_filters: List[Callable[[Record], bool]] = []
154 self.record_transformers: List[Callable[[Record], Record]] = []
155 self.field_filters: List[str] = [] # Fields to exclude
157 def add_field_mapping(
158 self,
159 source_field: str,
160 target_field: Optional[str] = None,
161 transformer: Optional[Callable[[Any], Any]] = None,
162 default_value: Any = None
163 ) -> "DataTransformer":
164 """Add a field mapping.
166 Args:
167 source_field: Source field name
168 target_field: Target field name (defaults to source_field)
169 transformer: Optional value transformer
170 default_value: Default value if source is None
172 Returns:
173 Self for chaining
174 """
175 self.field_mappings.append(FieldMapping(
176 source_field=source_field,
177 target_field=target_field or source_field,
178 transformer=transformer,
179 default_value=default_value
180 ))
181 return self
183 def rename_field(self, old_name: str, new_name: str) -> "DataTransformer":
184 """Rename a field.
186 Args:
187 old_name: Current field name
188 new_name: New field name
190 Returns:
191 Self for chaining
192 """
193 return self.add_field_mapping(old_name, new_name)
195 def exclude_fields(self, *field_names: str) -> "DataTransformer":
196 """Exclude fields from transformation.
198 Args:
199 *field_names: Field names to exclude
201 Returns:
202 Self for chaining
203 """
204 self.field_filters.extend(field_names)
205 return self
207 def add_record_filter(self, filter_func: Callable[[Record], bool]) -> "DataTransformer":
208 """Add a record filter.
210 Records that don't pass the filter will be skipped.
212 Args:
213 filter_func: Function that returns True to keep the record
215 Returns:
216 Self for chaining
217 """
218 self.record_filters.append(filter_func)
219 return self
221 def add_record_transformer(self, transformer: Callable[[Record], Record]) -> "DataTransformer":
222 """Add a record-level transformer.
224 Args:
225 transformer: Function that transforms the entire record
227 Returns:
228 Self for chaining
229 """
230 self.record_transformers.append(transformer)
231 return self
233 def transform(self, record: Record) -> Optional[Record]:
234 """Transform a record.
236 Args:
237 record: Source record
239 Returns:
240 Transformed record or None if filtered out
241 """
242 # Apply record filters
243 for filter_func in self.record_filters:
244 if not filter_func(record):
245 return None
247 # Create new record
248 new_record = Record()
250 # Apply field mappings
251 if self.field_mappings:
252 for mapping in self.field_mappings:
253 if mapping.source_field in record.fields:
254 source_field = record.fields[mapping.source_field]
255 value = mapping.apply(source_field.value)
257 new_record.fields[mapping.target_field] = Field(
258 name=mapping.target_field,
259 value=value,
260 type=type(value).__name__ if value is not None else 'str',
261 metadata=source_field.metadata.copy() if source_field.metadata else {}
262 )
263 elif mapping.default_value is not None:
264 new_record.fields[mapping.target_field] = Field(
265 name=mapping.target_field,
266 value=mapping.default_value,
267 type=type(mapping.default_value).__name__
268 )
269 else:
270 # No explicit mappings, copy all fields
271 for field_name, field in record.fields.items():
272 if field_name not in self.field_filters:
273 new_record.fields[field_name] = field.copy()
275 # Exclude filtered fields
276 for field_name in self.field_filters:
277 if field_name in new_record.fields:
278 del new_record.fields[field_name]
280 # Copy metadata
281 new_record.metadata = record.metadata.copy() if record.metadata else {}
283 # Apply record transformers
284 for transformer in self.record_transformers:
285 new_record = transformer(new_record)
286 if new_record is None:
287 return None
289 return new_record
292class TransformationPipeline:
293 """Chain multiple data transformers."""
295 def __init__(self, *transformers: Union[DataTransformer, Callable[[Record], Optional[Record]]]):
296 """Initialize transformation pipeline.
298 Args:
299 *transformers: Transformers to chain
300 """
301 self.transformers = list(transformers)
303 def add(self, transformer: Union[DataTransformer, Callable[[Record], Optional[Record]]]) -> "TransformationPipeline":
304 """Add a transformer to the pipeline.
306 Args:
307 transformer: Transformer to add
309 Returns:
310 Self for chaining
311 """
312 self.transformers.append(transformer)
313 return self
315 def transform(self, record: Record) -> Optional[Record]:
316 """Apply all transformations in sequence.
318 Args:
319 record: Source record
321 Returns:
322 Transformed record or None if filtered out
323 """
324 current = record
326 for transformer in self.transformers:
327 if isinstance(transformer, DataTransformer):
328 current = transformer.transform(current)
329 else:
330 current = transformer(current)
332 if current is None:
333 return None
335 return current
337 def __call__(self, record: Record) -> Optional[Record]:
338 """Make pipeline callable.
340 Args:
341 record: Source record
343 Returns:
344 Transformed record or None if filtered out
345 """
346 return self.transform(record)