Coverage for src/dataknobs_data/migration_v2/factory.py: 61%

106 statements  

« prev     ^ index     » next       coverage.py v7.10.3, created at 2025-08-15 12:32 -0500

1"""Factory classes for migration v2 components.""" 

2 

3import logging 

4from typing import Any, Dict, List, Optional, Callable 

5 

6from dataknobs_config import FactoryBase 

7 

8from .migration import Migration 

9from .operations import ( 

10 Operation, 

11 AddField, 

12 RemoveField, 

13 RenameField, 

14 TransformField, 

15 CompositeOperation, 

16) 

17from .transformer import Transformer 

18from .migrator import Migrator 

19 

20logger = logging.getLogger(__name__) 

21 

22 

23class MigrationFactory(FactoryBase): 

24 """Factory for creating migrations from configuration. 

25  

26 Configuration Options: 

27 from_version (str): Source version 

28 to_version (str): Target version 

29 description (str): Migration description 

30 operations (list): List of operation definitions 

31  

32 Operation Types: 

33 - add_field: Add a new field 

34 - remove_field: Remove an existing field 

35 - rename_field: Rename a field 

36 - transform_field: Transform field values 

37 - composite: Multiple operations combined 

38  

39 Example Configuration: 

40 migrations: 

41 - name: v1_to_v2 

42 factory: migration 

43 from_version: "1.0" 

44 to_version: "2.0" 

45 description: Add user metadata fields 

46 operations: 

47 - type: add_field 

48 field_name: created_at 

49 default_value: "2024-01-01" 

50 - type: rename_field 

51 old_name: username 

52 new_name: user_name 

53 - type: transform_field 

54 field_name: price 

55 transform: "lambda x: x * 1.1" 

56 """ 

57 

58 def create(self, **config) -> Migration: 

59 """Create a Migration instance from configuration. 

60  

61 Args: 

62 **config: Migration configuration 

63  

64 Returns: 

65 Migration instance 

66 """ 

67 from_version = config.get("from_version", "0.0") 

68 to_version = config.get("to_version", "1.0") 

69 description = config.get("description") 

70 

71 logger.info(f"Creating migration: {from_version} -> {to_version}") 

72 

73 migration = Migration(from_version, to_version, description) 

74 

75 # Add operations 

76 operations = config.get("operations", []) 

77 for op_config in operations: 

78 operation = self._create_operation(op_config) 

79 if operation: 

80 migration.add(operation) 

81 

82 return migration 

83 

84 def _create_operation(self, op_config: Dict[str, Any]) -> Optional[Operation]: 

85 """Create an operation from configuration. 

86  

87 Args: 

88 op_config: Operation configuration 

89  

90 Returns: 

91 Operation instance or None if invalid 

92 """ 

93 op_type = op_config.get("type", "").lower() 

94 

95 if op_type == "add_field": 

96 return AddField( 

97 field_name=op_config.get("field_name"), 

98 default_value=op_config.get("default_value"), 

99 field_type=self._parse_field_type(op_config.get("field_type")) 

100 ) 

101 

102 elif op_type == "remove_field": 

103 return RemoveField( 

104 field_name=op_config.get("field_name"), 

105 store_removed=op_config.get("store_removed", False) 

106 ) 

107 

108 elif op_type == "rename_field": 

109 return RenameField( 

110 old_name=op_config.get("old_name"), 

111 new_name=op_config.get("new_name") 

112 ) 

113 

114 elif op_type == "transform_field": 

115 # Note: Transform functions from config strings require careful handling 

116 # For security, we don't eval arbitrary code. Instead, support predefined transforms. 

117 transform_fn = self._get_transform_function(op_config.get("transform")) 

118 reverse_fn = self._get_transform_function(op_config.get("reverse")) 

119 

120 return TransformField( 

121 field_name=op_config.get("field_name"), 

122 transform_fn=transform_fn, 

123 reverse_fn=reverse_fn 

124 ) 

125 

126 elif op_type == "composite": 

127 sub_operations = [] 

128 for sub_config in op_config.get("operations", []): 

129 sub_op = self._create_operation(sub_config) 

130 if sub_op: 

131 sub_operations.append(sub_op) 

132 return CompositeOperation(sub_operations) if sub_operations else None 

133 

134 else: 

135 logger.warning(f"Unknown operation type: {op_type}") 

136 return None 

137 

138 def _parse_field_type(self, type_str: Optional[str]): 

139 """Parse field type from string. 

140  

141 Args: 

142 type_str: Field type string 

143  

144 Returns: 

145 FieldType or None 

146 """ 

147 if not type_str: 

148 return None 

149 

150 from dataknobs_data.fields import FieldType 

151 

152 try: 

153 return FieldType[type_str.upper()] 

154 except KeyError: 

155 logger.warning(f"Unknown field type: {type_str}") 

156 return None 

157 

158 def _get_transform_function(self, transform_spec: Any) -> Optional[Callable]: 

159 """Get transform function from specification. 

160  

161 For security, we don't eval arbitrary code. Instead, we support 

162 predefined transform patterns. 

163  

164 Args: 

165 transform_spec: Transform specification 

166  

167 Returns: 

168 Transform function or None 

169 """ 

170 if not transform_spec: 

171 return None 

172 

173 # Support predefined transforms 

174 if transform_spec == "uppercase": 

175 return lambda x: x.upper() if isinstance(x, str) else x 

176 elif transform_spec == "lowercase": 

177 return lambda x: x.lower() if isinstance(x, str) else x 

178 elif transform_spec == "trim": 

179 return lambda x: x.strip() if isinstance(x, str) else x 

180 elif isinstance(transform_spec, dict): 

181 # Support multiplication/division 

182 if "multiply" in transform_spec: 

183 factor = transform_spec["multiply"] 

184 return lambda x: x * factor if isinstance(x, (int, float)) else x 

185 elif "divide" in transform_spec: 

186 factor = transform_spec["divide"] 

187 return lambda x: x / factor if isinstance(x, (int, float)) else x 

188 

189 logger.warning(f"Unsupported transform specification: {transform_spec}") 

190 return None 

191 

192 

193class TransformerFactory(FactoryBase): 

194 """Factory for creating transformers from configuration. 

195  

196 Configuration Options: 

197 rules (list): List of transformation rules 

198  

199 Rule Types: 

200 - map: Map field to another field with optional transformation 

201 - rename: Rename a field 

202 - exclude: Remove fields 

203 - add: Add new fields 

204  

205 Example Configuration: 

206 transformers: 

207 - name: cleanup_transformer 

208 factory: transformer 

209 rules: 

210 - type: map 

211 source: old_id 

212 target: id 

213 - type: exclude 

214 fields: [temp_field, debug_info] 

215 - type: add 

216 field_name: processed 

217 value: true 

218 """ 

219 

220 def create(self, **config) -> Transformer: 

221 """Create a Transformer instance from configuration. 

222  

223 Args: 

224 **config: Transformer configuration 

225  

226 Returns: 

227 Transformer instance 

228 """ 

229 logger.info("Creating transformer") 

230 

231 transformer = Transformer() 

232 

233 # Add rules 

234 rules = config.get("rules", []) 

235 for rule_config in rules: 

236 self._add_rule(transformer, rule_config) 

237 

238 return transformer 

239 

240 def _add_rule(self, transformer: Transformer, rule_config: Dict[str, Any]) -> None: 

241 """Add a rule to the transformer. 

242  

243 Args: 

244 transformer: Transformer to add rule to 

245 rule_config: Rule configuration 

246 """ 

247 rule_type = rule_config.get("type", "").lower() 

248 

249 if rule_type == "map": 

250 source = rule_config.get("source") 

251 target = rule_config.get("target") 

252 if source: 

253 transformer.map(source, target) 

254 

255 elif rule_type == "rename": 

256 old_name = rule_config.get("old_name") 

257 new_name = rule_config.get("new_name") 

258 if old_name and new_name: 

259 transformer.rename(old_name, new_name) 

260 

261 elif rule_type == "exclude": 

262 fields = rule_config.get("fields", []) 

263 if fields: 

264 transformer.exclude(*fields) 

265 

266 elif rule_type == "add": 

267 field_name = rule_config.get("field_name") 

268 value = rule_config.get("value") 

269 if field_name is not None: 

270 transformer.add(field_name, value) 

271 

272 else: 

273 logger.warning(f"Unknown rule type: {rule_type}") 

274 

275 

276class MigratorFactory(FactoryBase): 

277 """Factory for creating migrators. 

278  

279 The Migrator doesn't require configuration, but this factory 

280 provides a consistent interface for the config system. 

281 """ 

282 

283 def create(self, **config) -> Migrator: 

284 """Create a Migrator instance. 

285  

286 Args: 

287 **config: Currently unused 

288  

289 Returns: 

290 Migrator instance 

291 """ 

292 logger.info("Creating migrator") 

293 return Migrator() 

294 

295 

296# Create singleton instances for registration 

297migration_factory = MigrationFactory() 

298transformer_factory = TransformerFactory() 

299migrator_factory = MigratorFactory()