Coverage for src/dataknobs_data/migration/factory.py: 18%

122 statements  

« prev     ^ index     » next       coverage.py v7.11.3, created at 2025-11-13 11:23 -0700

1"""Factory classes for migration v2 components.""" 

2 

3from __future__ import annotations 

4 

5import logging 

6from typing import Any, TYPE_CHECKING 

7 

8from dataknobs_config import FactoryBase 

9 

10from .migration import Migration 

11from .migrator import Migrator 

12from .operations import ( 

13 AddField, 

14 CompositeOperation, 

15 Operation, 

16 RemoveField, 

17 RenameField, 

18 TransformField, 

19) 

20from .transformer import Transformer 

21 

22if TYPE_CHECKING: 

23 from collections.abc import Callable 

24 

25 

26logger = logging.getLogger(__name__) 

27 

28 

29class MigrationFactory(FactoryBase): 

30 """Factory for creating migrations from configuration. 

31  

32 Configuration Options: 

33 from_version (str): Source version 

34 to_version (str): Target version 

35 description (str): Migration description 

36 operations (list): List of operation definitions 

37  

38 Operation Types: 

39 - add_field: Add a new field 

40 - remove_field: Remove an existing field 

41 - rename_field: Rename a field 

42 - transform_field: Transform field values 

43 - composite: Multiple operations combined 

44  

45 Example Configuration: 

46 migrations: 

47 - name: v1_to_v2 

48 factory: migration 

49 from_version: "1.0" 

50 to_version: "2.0" 

51 description: Add user metadata fields 

52 operations: 

53 - type: add_field 

54 field_name: created_at 

55 default_value: "2024-01-01" 

56 - type: rename_field 

57 old_name: username 

58 new_name: user_name 

59 - type: transform_field 

60 field_name: price 

61 transform: "lambda x: x * 1.1" 

62 """ 

63 

64 def create(self, **config) -> Migration: 

65 """Create a Migration instance from configuration. 

66  

67 Args: 

68 **config: Migration configuration 

69  

70 Returns: 

71 Migration instance 

72 """ 

73 from_version = config.get("from_version", "0.0") 

74 to_version = config.get("to_version", "1.0") 

75 description = config.get("description") 

76 

77 logger.info(f"Creating migration: {from_version} -> {to_version}") 

78 

79 migration = Migration(from_version, to_version, description) 

80 

81 # Add operations 

82 operations = config.get("operations", []) 

83 for op_config in operations: 

84 operation = self._create_operation(op_config) 

85 if operation: 

86 migration.add(operation) 

87 

88 return migration 

89 

90 def _create_operation(self, op_config: dict[str, Any]) -> Operation | None: 

91 """Create an operation from configuration. 

92  

93 Args: 

94 op_config: Operation configuration 

95  

96 Returns: 

97 Operation instance or None if invalid 

98 """ 

99 op_type = op_config.get("type", "").lower() 

100 

101 if op_type == "add_field": 

102 field_name = op_config.get("field_name") 

103 if not field_name: 

104 raise ValueError("add_field operation requires field_name") 

105 return AddField( 

106 field_name=field_name, 

107 default_value=op_config.get("default_value"), 

108 field_type=self._parse_field_type(op_config.get("field_type")) 

109 ) 

110 

111 elif op_type == "remove_field": 

112 field_name = op_config.get("field_name") 

113 if not field_name: 

114 raise ValueError("remove_field operation requires field_name") 

115 return RemoveField( 

116 field_name=field_name, 

117 store_removed=op_config.get("store_removed", False) 

118 ) 

119 

120 elif op_type == "rename_field": 

121 old_name = op_config.get("old_name") 

122 new_name = op_config.get("new_name") 

123 if not old_name or not new_name: 

124 raise ValueError("rename_field operation requires old_name and new_name") 

125 return RenameField( 

126 old_name=old_name, 

127 new_name=new_name 

128 ) 

129 

130 elif op_type == "transform_field": 

131 # Note: Transform functions from config strings require careful handling 

132 # For security, we don't eval arbitrary code. Instead, support predefined transforms. 

133 field_name = op_config.get("field_name") 

134 if not field_name: 

135 raise ValueError("transform_field operation requires field_name") 

136 transform_fn = self._get_transform_function(op_config.get("transform")) 

137 if not transform_fn: 

138 raise ValueError("transform_field operation requires transform function") 

139 reverse_fn = self._get_transform_function(op_config.get("reverse")) 

140 

141 return TransformField( 

142 field_name=field_name, 

143 transform_fn=transform_fn, 

144 reverse_fn=reverse_fn 

145 ) 

146 

147 elif op_type == "composite": 

148 sub_operations = [] 

149 for sub_config in op_config.get("operations", []): 

150 sub_op = self._create_operation(sub_config) 

151 if sub_op: 

152 sub_operations.append(sub_op) 

153 return CompositeOperation(sub_operations) if sub_operations else None 

154 

155 else: 

156 logger.warning(f"Unknown operation type: {op_type}") 

157 return None 

158 

159 def _parse_field_type(self, type_str: str | None): 

160 """Parse field type from string. 

161  

162 Args: 

163 type_str: Field type string 

164  

165 Returns: 

166 FieldType or None 

167 """ 

168 if not type_str: 

169 return None 

170 

171 from dataknobs_data.fields import FieldType 

172 

173 try: 

174 return FieldType[type_str.upper()] 

175 except KeyError: 

176 logger.warning(f"Unknown field type: {type_str}") 

177 return None 

178 

179 def _get_transform_function(self, transform_spec: Any) -> Callable | None: 

180 """Get transform function from specification. 

181  

182 For security, we don't eval arbitrary code. Instead, we support 

183 predefined transform patterns. 

184  

185 Args: 

186 transform_spec: Transform specification 

187  

188 Returns: 

189 Transform function or None 

190 """ 

191 if not transform_spec: 

192 return None 

193 

194 # Support predefined transforms 

195 if transform_spec == "uppercase": 

196 return lambda x: x.upper() if isinstance(x, str) else x 

197 elif transform_spec == "lowercase": 

198 return lambda x: x.lower() if isinstance(x, str) else x 

199 elif transform_spec == "trim": 

200 return lambda x: x.strip() if isinstance(x, str) else x 

201 elif isinstance(transform_spec, dict): 

202 # Support multiplication/division 

203 if "multiply" in transform_spec: 

204 factor = transform_spec["multiply"] 

205 return lambda x: x * factor if isinstance(x, (int, float)) else x 

206 elif "divide" in transform_spec: 

207 factor = transform_spec["divide"] 

208 return lambda x: x / factor if isinstance(x, (int, float)) else x 

209 

210 logger.warning(f"Unsupported transform specification: {transform_spec}") 

211 return None 

212 

213 

214class TransformerFactory(FactoryBase): 

215 """Factory for creating transformers from configuration. 

216  

217 Configuration Options: 

218 rules (list): List of transformation rules 

219  

220 Rule Types: 

221 - map: Map field to another field with optional transformation 

222 - rename: Rename a field 

223 - exclude: Remove fields 

224 - add: Add new fields 

225  

226 Example Configuration: 

227 transformers: 

228 - name: cleanup_transformer 

229 factory: transformer 

230 rules: 

231 - type: map 

232 source: old_id 

233 target: id 

234 - type: exclude 

235 fields: [temp_field, debug_info] 

236 - type: add 

237 field_name: processed 

238 value: true 

239 """ 

240 

241 def create(self, **config) -> Transformer: 

242 """Create a Transformer instance from configuration. 

243  

244 Args: 

245 **config: Transformer configuration 

246  

247 Returns: 

248 Transformer instance 

249 """ 

250 logger.info("Creating transformer") 

251 

252 transformer = Transformer() 

253 

254 # Add rules 

255 rules = config.get("rules", []) 

256 for rule_config in rules: 

257 self._add_rule(transformer, rule_config) 

258 

259 return transformer 

260 

261 def _add_rule(self, transformer: Transformer, rule_config: dict[str, Any]) -> None: 

262 """Add a rule to the transformer. 

263  

264 Args: 

265 transformer: Transformer to add rule to 

266 rule_config: Rule configuration 

267 """ 

268 rule_type = rule_config.get("type", "").lower() 

269 

270 if rule_type == "map": 

271 source = rule_config.get("source") 

272 target = rule_config.get("target") 

273 if source: 

274 transformer.map(source, target) 

275 

276 elif rule_type == "rename": 

277 old_name = rule_config.get("old_name") 

278 new_name = rule_config.get("new_name") 

279 if old_name and new_name: 

280 transformer.rename(old_name, new_name) 

281 

282 elif rule_type == "exclude": 

283 fields = rule_config.get("fields", []) 

284 if fields: 

285 transformer.exclude(*fields) 

286 

287 elif rule_type == "add": 

288 field_name = rule_config.get("field_name") 

289 value = rule_config.get("value") 

290 if field_name is not None: 

291 transformer.add(field_name, value) 

292 

293 else: 

294 logger.warning(f"Unknown rule type: {rule_type}") 

295 

296 

297class MigratorFactory(FactoryBase): 

298 """Factory for creating migrators. 

299  

300 The Migrator doesn't require configuration, but this factory 

301 provides a consistent interface for the config system. 

302 """ 

303 

304 def create(self, **config) -> Migrator: 

305 """Create a Migrator instance. 

306  

307 Args: 

308 **config: Currently unused 

309  

310 Returns: 

311 Migrator instance 

312 """ 

313 logger.info("Creating migrator") 

314 return Migrator() 

315 

316 

317# Create singleton instances for registration 

318migration_factory = MigrationFactory() 

319transformer_factory = TransformerFactory() 

320migrator_factory = MigratorFactory()