Coverage for src/dataknobs_data/migration_v2/factory.py: 61%
106 statements
« prev ^ index » next coverage.py v7.10.3, created at 2025-08-15 12:32 -0500
« prev ^ index » next coverage.py v7.10.3, created at 2025-08-15 12:32 -0500
1"""Factory classes for migration v2 components."""
3import logging
4from typing import Any, Dict, List, Optional, Callable
6from dataknobs_config import FactoryBase
8from .migration import Migration
9from .operations import (
10 Operation,
11 AddField,
12 RemoveField,
13 RenameField,
14 TransformField,
15 CompositeOperation,
16)
17from .transformer import Transformer
18from .migrator import Migrator
20logger = logging.getLogger(__name__)
23class MigrationFactory(FactoryBase):
24 """Factory for creating migrations from configuration.
26 Configuration Options:
27 from_version (str): Source version
28 to_version (str): Target version
29 description (str): Migration description
30 operations (list): List of operation definitions
32 Operation Types:
33 - add_field: Add a new field
34 - remove_field: Remove an existing field
35 - rename_field: Rename a field
36 - transform_field: Transform field values
37 - composite: Multiple operations combined
39 Example Configuration:
40 migrations:
41 - name: v1_to_v2
42 factory: migration
43 from_version: "1.0"
44 to_version: "2.0"
45 description: Add user metadata fields
46 operations:
47 - type: add_field
48 field_name: created_at
49 default_value: "2024-01-01"
50 - type: rename_field
51 old_name: username
52 new_name: user_name
53 - type: transform_field
54 field_name: price
55 transform: "lambda x: x * 1.1"
56 """
58 def create(self, **config) -> Migration:
59 """Create a Migration instance from configuration.
61 Args:
62 **config: Migration configuration
64 Returns:
65 Migration instance
66 """
67 from_version = config.get("from_version", "0.0")
68 to_version = config.get("to_version", "1.0")
69 description = config.get("description")
71 logger.info(f"Creating migration: {from_version} -> {to_version}")
73 migration = Migration(from_version, to_version, description)
75 # Add operations
76 operations = config.get("operations", [])
77 for op_config in operations:
78 operation = self._create_operation(op_config)
79 if operation:
80 migration.add(operation)
82 return migration
84 def _create_operation(self, op_config: Dict[str, Any]) -> Optional[Operation]:
85 """Create an operation from configuration.
87 Args:
88 op_config: Operation configuration
90 Returns:
91 Operation instance or None if invalid
92 """
93 op_type = op_config.get("type", "").lower()
95 if op_type == "add_field":
96 return AddField(
97 field_name=op_config.get("field_name"),
98 default_value=op_config.get("default_value"),
99 field_type=self._parse_field_type(op_config.get("field_type"))
100 )
102 elif op_type == "remove_field":
103 return RemoveField(
104 field_name=op_config.get("field_name"),
105 store_removed=op_config.get("store_removed", False)
106 )
108 elif op_type == "rename_field":
109 return RenameField(
110 old_name=op_config.get("old_name"),
111 new_name=op_config.get("new_name")
112 )
114 elif op_type == "transform_field":
115 # Note: Transform functions from config strings require careful handling
116 # For security, we don't eval arbitrary code. Instead, support predefined transforms.
117 transform_fn = self._get_transform_function(op_config.get("transform"))
118 reverse_fn = self._get_transform_function(op_config.get("reverse"))
120 return TransformField(
121 field_name=op_config.get("field_name"),
122 transform_fn=transform_fn,
123 reverse_fn=reverse_fn
124 )
126 elif op_type == "composite":
127 sub_operations = []
128 for sub_config in op_config.get("operations", []):
129 sub_op = self._create_operation(sub_config)
130 if sub_op:
131 sub_operations.append(sub_op)
132 return CompositeOperation(sub_operations) if sub_operations else None
134 else:
135 logger.warning(f"Unknown operation type: {op_type}")
136 return None
138 def _parse_field_type(self, type_str: Optional[str]):
139 """Parse field type from string.
141 Args:
142 type_str: Field type string
144 Returns:
145 FieldType or None
146 """
147 if not type_str:
148 return None
150 from dataknobs_data.fields import FieldType
152 try:
153 return FieldType[type_str.upper()]
154 except KeyError:
155 logger.warning(f"Unknown field type: {type_str}")
156 return None
158 def _get_transform_function(self, transform_spec: Any) -> Optional[Callable]:
159 """Get transform function from specification.
161 For security, we don't eval arbitrary code. Instead, we support
162 predefined transform patterns.
164 Args:
165 transform_spec: Transform specification
167 Returns:
168 Transform function or None
169 """
170 if not transform_spec:
171 return None
173 # Support predefined transforms
174 if transform_spec == "uppercase":
175 return lambda x: x.upper() if isinstance(x, str) else x
176 elif transform_spec == "lowercase":
177 return lambda x: x.lower() if isinstance(x, str) else x
178 elif transform_spec == "trim":
179 return lambda x: x.strip() if isinstance(x, str) else x
180 elif isinstance(transform_spec, dict):
181 # Support multiplication/division
182 if "multiply" in transform_spec:
183 factor = transform_spec["multiply"]
184 return lambda x: x * factor if isinstance(x, (int, float)) else x
185 elif "divide" in transform_spec:
186 factor = transform_spec["divide"]
187 return lambda x: x / factor if isinstance(x, (int, float)) else x
189 logger.warning(f"Unsupported transform specification: {transform_spec}")
190 return None
193class TransformerFactory(FactoryBase):
194 """Factory for creating transformers from configuration.
196 Configuration Options:
197 rules (list): List of transformation rules
199 Rule Types:
200 - map: Map field to another field with optional transformation
201 - rename: Rename a field
202 - exclude: Remove fields
203 - add: Add new fields
205 Example Configuration:
206 transformers:
207 - name: cleanup_transformer
208 factory: transformer
209 rules:
210 - type: map
211 source: old_id
212 target: id
213 - type: exclude
214 fields: [temp_field, debug_info]
215 - type: add
216 field_name: processed
217 value: true
218 """
220 def create(self, **config) -> Transformer:
221 """Create a Transformer instance from configuration.
223 Args:
224 **config: Transformer configuration
226 Returns:
227 Transformer instance
228 """
229 logger.info("Creating transformer")
231 transformer = Transformer()
233 # Add rules
234 rules = config.get("rules", [])
235 for rule_config in rules:
236 self._add_rule(transformer, rule_config)
238 return transformer
240 def _add_rule(self, transformer: Transformer, rule_config: Dict[str, Any]) -> None:
241 """Add a rule to the transformer.
243 Args:
244 transformer: Transformer to add rule to
245 rule_config: Rule configuration
246 """
247 rule_type = rule_config.get("type", "").lower()
249 if rule_type == "map":
250 source = rule_config.get("source")
251 target = rule_config.get("target")
252 if source:
253 transformer.map(source, target)
255 elif rule_type == "rename":
256 old_name = rule_config.get("old_name")
257 new_name = rule_config.get("new_name")
258 if old_name and new_name:
259 transformer.rename(old_name, new_name)
261 elif rule_type == "exclude":
262 fields = rule_config.get("fields", [])
263 if fields:
264 transformer.exclude(*fields)
266 elif rule_type == "add":
267 field_name = rule_config.get("field_name")
268 value = rule_config.get("value")
269 if field_name is not None:
270 transformer.add(field_name, value)
272 else:
273 logger.warning(f"Unknown rule type: {rule_type}")
276class MigratorFactory(FactoryBase):
277 """Factory for creating migrators.
279 The Migrator doesn't require configuration, but this factory
280 provides a consistent interface for the config system.
281 """
283 def create(self, **config) -> Migrator:
284 """Create a Migrator instance.
286 Args:
287 **config: Currently unused
289 Returns:
290 Migrator instance
291 """
292 logger.info("Creating migrator")
293 return Migrator()
296# Create singleton instances for registration
297migration_factory = MigrationFactory()
298transformer_factory = TransformerFactory()
299migrator_factory = MigratorFactory()