Coverage for src / dataknobs_data / migration / factory.py: 18%
122 statements
« prev ^ index » next coverage.py v7.13.0, created at 2025-12-26 15:45 -0700
« prev ^ index » next coverage.py v7.13.0, created at 2025-12-26 15:45 -0700
1"""Factory classes for migration v2 components."""
3from __future__ import annotations
5import logging
6from typing import Any, TYPE_CHECKING
8from dataknobs_config import FactoryBase
10from .migration import Migration
11from .migrator import Migrator
12from .operations import (
13 AddField,
14 CompositeOperation,
15 Operation,
16 RemoveField,
17 RenameField,
18 TransformField,
19)
20from .transformer import Transformer
22if TYPE_CHECKING:
23 from collections.abc import Callable
26logger = logging.getLogger(__name__)
29class MigrationFactory(FactoryBase):
30 """Factory for creating migrations from configuration.
32 Configuration Options:
33 from_version (str): Source version
34 to_version (str): Target version
35 description (str): Migration description
36 operations (list): List of operation definitions
38 Operation Types:
39 - add_field: Add a new field
40 - remove_field: Remove an existing field
41 - rename_field: Rename a field
42 - transform_field: Transform field values
43 - composite: Multiple operations combined
45 Example Configuration:
46 migrations:
47 - name: v1_to_v2
48 factory: migration
49 from_version: "1.0"
50 to_version: "2.0"
51 description: Add user metadata fields
52 operations:
53 - type: add_field
54 field_name: created_at
55 default_value: "2024-01-01"
56 - type: rename_field
57 old_name: username
58 new_name: user_name
59 - type: transform_field
60 field_name: price
61 transform: "lambda x: x * 1.1"
62 """
64 def create(self, **config) -> Migration:
65 """Create a Migration instance from configuration.
67 Args:
68 **config: Migration configuration
70 Returns:
71 Migration instance
72 """
73 from_version = config.get("from_version", "0.0")
74 to_version = config.get("to_version", "1.0")
75 description = config.get("description")
77 logger.info(f"Creating migration: {from_version} -> {to_version}")
79 migration = Migration(from_version, to_version, description)
81 # Add operations
82 operations = config.get("operations", [])
83 for op_config in operations:
84 operation = self._create_operation(op_config)
85 if operation:
86 migration.add(operation)
88 return migration
90 def _create_operation(self, op_config: dict[str, Any]) -> Operation | None:
91 """Create an operation from configuration.
93 Args:
94 op_config: Operation configuration
96 Returns:
97 Operation instance or None if invalid
98 """
99 op_type = op_config.get("type", "").lower()
101 if op_type == "add_field":
102 field_name = op_config.get("field_name")
103 if not field_name:
104 raise ValueError("add_field operation requires field_name")
105 return AddField(
106 field_name=field_name,
107 default_value=op_config.get("default_value"),
108 field_type=self._parse_field_type(op_config.get("field_type"))
109 )
111 elif op_type == "remove_field":
112 field_name = op_config.get("field_name")
113 if not field_name:
114 raise ValueError("remove_field operation requires field_name")
115 return RemoveField(
116 field_name=field_name,
117 store_removed=op_config.get("store_removed", False)
118 )
120 elif op_type == "rename_field":
121 old_name = op_config.get("old_name")
122 new_name = op_config.get("new_name")
123 if not old_name or not new_name:
124 raise ValueError("rename_field operation requires old_name and new_name")
125 return RenameField(
126 old_name=old_name,
127 new_name=new_name
128 )
130 elif op_type == "transform_field":
131 # Note: Transform functions from config strings require careful handling
132 # For security, we don't eval arbitrary code. Instead, support predefined transforms.
133 field_name = op_config.get("field_name")
134 if not field_name:
135 raise ValueError("transform_field operation requires field_name")
136 transform_fn = self._get_transform_function(op_config.get("transform"))
137 if not transform_fn:
138 raise ValueError("transform_field operation requires transform function")
139 reverse_fn = self._get_transform_function(op_config.get("reverse"))
141 return TransformField(
142 field_name=field_name,
143 transform_fn=transform_fn,
144 reverse_fn=reverse_fn
145 )
147 elif op_type == "composite":
148 sub_operations = []
149 for sub_config in op_config.get("operations", []):
150 sub_op = self._create_operation(sub_config)
151 if sub_op:
152 sub_operations.append(sub_op)
153 return CompositeOperation(sub_operations) if sub_operations else None
155 else:
156 logger.warning(f"Unknown operation type: {op_type}")
157 return None
159 def _parse_field_type(self, type_str: str | None):
160 """Parse field type from string.
162 Args:
163 type_str: Field type string
165 Returns:
166 FieldType or None
167 """
168 if not type_str:
169 return None
171 from dataknobs_data.fields import FieldType
173 try:
174 return FieldType[type_str.upper()]
175 except KeyError:
176 logger.warning(f"Unknown field type: {type_str}")
177 return None
179 def _get_transform_function(self, transform_spec: Any) -> Callable | None:
180 """Get transform function from specification.
182 For security, we don't eval arbitrary code. Instead, we support
183 predefined transform patterns.
185 Args:
186 transform_spec: Transform specification
188 Returns:
189 Transform function or None
190 """
191 if not transform_spec:
192 return None
194 # Support predefined transforms
195 if transform_spec == "uppercase":
196 return lambda x: x.upper() if isinstance(x, str) else x
197 elif transform_spec == "lowercase":
198 return lambda x: x.lower() if isinstance(x, str) else x
199 elif transform_spec == "trim":
200 return lambda x: x.strip() if isinstance(x, str) else x
201 elif isinstance(transform_spec, dict):
202 # Support multiplication/division
203 if "multiply" in transform_spec:
204 factor = transform_spec["multiply"]
205 return lambda x: x * factor if isinstance(x, (int, float)) else x
206 elif "divide" in transform_spec:
207 factor = transform_spec["divide"]
208 return lambda x: x / factor if isinstance(x, (int, float)) else x
210 logger.warning(f"Unsupported transform specification: {transform_spec}")
211 return None
214class TransformerFactory(FactoryBase):
215 """Factory for creating transformers from configuration.
217 Configuration Options:
218 rules (list): List of transformation rules
220 Rule Types:
221 - map: Map field to another field with optional transformation
222 - rename: Rename a field
223 - exclude: Remove fields
224 - add: Add new fields
226 Example Configuration:
227 transformers:
228 - name: cleanup_transformer
229 factory: transformer
230 rules:
231 - type: map
232 source: old_id
233 target: id
234 - type: exclude
235 fields: [temp_field, debug_info]
236 - type: add
237 field_name: processed
238 value: true
239 """
241 def create(self, **config) -> Transformer:
242 """Create a Transformer instance from configuration.
244 Args:
245 **config: Transformer configuration
247 Returns:
248 Transformer instance
249 """
250 logger.info("Creating transformer")
252 transformer = Transformer()
254 # Add rules
255 rules = config.get("rules", [])
256 for rule_config in rules:
257 self._add_rule(transformer, rule_config)
259 return transformer
261 def _add_rule(self, transformer: Transformer, rule_config: dict[str, Any]) -> None:
262 """Add a rule to the transformer.
264 Args:
265 transformer: Transformer to add rule to
266 rule_config: Rule configuration
267 """
268 rule_type = rule_config.get("type", "").lower()
270 if rule_type == "map":
271 source = rule_config.get("source")
272 target = rule_config.get("target")
273 if source:
274 transformer.map(source, target)
276 elif rule_type == "rename":
277 old_name = rule_config.get("old_name")
278 new_name = rule_config.get("new_name")
279 if old_name and new_name:
280 transformer.rename(old_name, new_name)
282 elif rule_type == "exclude":
283 fields = rule_config.get("fields", [])
284 if fields:
285 transformer.exclude(*fields)
287 elif rule_type == "add":
288 field_name = rule_config.get("field_name")
289 value = rule_config.get("value")
290 if field_name is not None:
291 transformer.add(field_name, value)
293 else:
294 logger.warning(f"Unknown rule type: {rule_type}")
297class MigratorFactory(FactoryBase):
298 """Factory for creating migrators.
300 The Migrator doesn't require configuration, but this factory
301 provides a consistent interface for the config system.
302 """
304 def create(self, **config) -> Migrator:
305 """Create a Migrator instance.
307 Args:
308 **config: Currently unused
310 Returns:
311 Migrator instance
312 """
313 logger.info("Creating migrator")
314 return Migrator()
317# Create singleton instances for registration
318migration_factory = MigrationFactory()
319transformer_factory = TransformerFactory()
320migrator_factory = MigratorFactory()