Coverage for src/dataknobs_data/migration_old_backup/transformers.py: 0%

167 statements  

« prev     ^ index     » next       coverage.py v7.10.3, created at 2025-08-15 12:32 -0500

1"""Data transformation utilities for migrations.""" 

2 

3import logging 

4from dataclasses import dataclass 

5from typing import Any, Callable, Dict, List, Optional, Union 

6 

7from dataknobs_data.fields import Field 

8from dataknobs_data.records import Record 

9 

10logger = logging.getLogger(__name__) 

11 

12 

13@dataclass 

14class FieldMapping: 

15 """Mapping between source and target fields.""" 

16 source_field: str 

17 target_field: str 

18 transformer: Optional[Callable[[Any], Any]] = None 

19 default_value: Any = None 

20 

21 def apply(self, source_value: Any) -> Any: 

22 """Apply transformation to field value.""" 

23 if source_value is None and self.default_value is not None: 

24 return self.default_value 

25 

26 if self.transformer: 

27 try: 

28 return self.transformer(source_value) 

29 except Exception as e: 

30 logger.warning(f"Transformation failed for {self.source_field}: {e}") 

31 return self.default_value 

32 

33 return source_value 

34 

35 

36class ValueTransformer: 

37 """Common value transformation functions.""" 

38 

39 @staticmethod 

40 def to_string(value: Any) -> str: 

41 """Convert any value to string.""" 

42 if value is None: 

43 return "" 

44 return str(value) 

45 

46 @staticmethod 

47 def to_int(value: Any) -> Optional[int]: 

48 """Convert value to integer.""" 

49 if value is None: 

50 return None 

51 try: 

52 if isinstance(value, str): 

53 # Handle string representations 

54 value = value.strip() 

55 if value == "": 

56 return None 

57 return int(float(value)) # Handle floats 

58 except (ValueError, TypeError): 

59 return None 

60 

61 @staticmethod 

62 def to_float(value: Any) -> Optional[float]: 

63 """Convert value to float.""" 

64 if value is None: 

65 return None 

66 try: 

67 return float(value) 

68 except (ValueError, TypeError): 

69 return None 

70 

71 @staticmethod 

72 def to_bool(value: Any) -> bool: 

73 """Convert value to boolean.""" 

74 if isinstance(value, bool): 

75 return value 

76 if isinstance(value, str): 

77 return value.lower() in ('true', 'yes', '1', 'on') 

78 return bool(value) 

79 

80 @staticmethod 

81 def parse_json(value: str) -> Any: 

82 """Parse JSON string.""" 

83 import json 

84 try: 

85 return json.loads(value) 

86 except (json.JSONDecodeError, TypeError): 

87 return value 

88 

89 @staticmethod 

90 def to_json(value: Any) -> str: 

91 """Convert value to JSON string.""" 

92 import json 

93 try: 

94 return json.dumps(value) 

95 except (TypeError, ValueError): 

96 return str(value) 

97 

98 @staticmethod 

99 def normalize_string(value: str) -> str: 

100 """Normalize string (lowercase, strip whitespace).""" 

101 if not isinstance(value, str): 

102 value = str(value) 

103 return value.lower().strip() 

104 

105 @staticmethod 

106 def truncate(max_length: int) -> Callable[[str], str]: 

107 """Create a truncation transformer.""" 

108 def truncator(value: str) -> str: 

109 if not isinstance(value, str): 

110 value = str(value) 

111 return value[:max_length] 

112 return truncator 

113 

114 @staticmethod 

115 def regex_extract(pattern: str, group: int = 0) -> Callable[[str], Optional[str]]: 

116 """Create a regex extraction transformer.""" 

117 import re 

118 compiled_pattern = re.compile(pattern) 

119 

120 def extractor(value: str) -> Optional[str]: 

121 if not isinstance(value, str): 

122 value = str(value) 

123 match = compiled_pattern.search(value) 

124 if match: 

125 return match.group(group) 

126 return None 

127 

128 return extractor 

129 

130 @staticmethod 

131 def map_values(mapping: Dict[Any, Any], default: Any = None) -> Callable[[Any], Any]: 

132 """Create a value mapping transformer.""" 

133 def mapper(value: Any) -> Any: 

134 return mapping.get(value, default) 

135 return mapper 

136 

137 @staticmethod 

138 def chain(*transformers: Callable[[Any], Any]) -> Callable[[Any], Any]: 

139 """Chain multiple transformers.""" 

140 def chained(value: Any) -> Any: 

141 for transformer in transformers: 

142 value = transformer(value) 

143 return value 

144 return chained 

145 

146 

147class DataTransformer: 

148 """Transform records during migration.""" 

149 

150 def __init__(self): 

151 """Initialize data transformer.""" 

152 self.field_mappings: List[FieldMapping] = [] 

153 self.record_filters: List[Callable[[Record], bool]] = [] 

154 self.record_transformers: List[Callable[[Record], Record]] = [] 

155 self.field_filters: List[str] = [] # Fields to exclude 

156 

157 def add_field_mapping( 

158 self, 

159 source_field: str, 

160 target_field: Optional[str] = None, 

161 transformer: Optional[Callable[[Any], Any]] = None, 

162 default_value: Any = None 

163 ) -> "DataTransformer": 

164 """Add a field mapping. 

165  

166 Args: 

167 source_field: Source field name 

168 target_field: Target field name (defaults to source_field) 

169 transformer: Optional value transformer 

170 default_value: Default value if source is None 

171  

172 Returns: 

173 Self for chaining 

174 """ 

175 self.field_mappings.append(FieldMapping( 

176 source_field=source_field, 

177 target_field=target_field or source_field, 

178 transformer=transformer, 

179 default_value=default_value 

180 )) 

181 return self 

182 

183 def rename_field(self, old_name: str, new_name: str) -> "DataTransformer": 

184 """Rename a field. 

185  

186 Args: 

187 old_name: Current field name 

188 new_name: New field name 

189  

190 Returns: 

191 Self for chaining 

192 """ 

193 return self.add_field_mapping(old_name, new_name) 

194 

195 def exclude_fields(self, *field_names: str) -> "DataTransformer": 

196 """Exclude fields from transformation. 

197  

198 Args: 

199 *field_names: Field names to exclude 

200  

201 Returns: 

202 Self for chaining 

203 """ 

204 self.field_filters.extend(field_names) 

205 return self 

206 

207 def add_record_filter(self, filter_func: Callable[[Record], bool]) -> "DataTransformer": 

208 """Add a record filter. 

209  

210 Records that don't pass the filter will be skipped. 

211  

212 Args: 

213 filter_func: Function that returns True to keep the record 

214  

215 Returns: 

216 Self for chaining 

217 """ 

218 self.record_filters.append(filter_func) 

219 return self 

220 

221 def add_record_transformer(self, transformer: Callable[[Record], Record]) -> "DataTransformer": 

222 """Add a record-level transformer. 

223  

224 Args: 

225 transformer: Function that transforms the entire record 

226  

227 Returns: 

228 Self for chaining 

229 """ 

230 self.record_transformers.append(transformer) 

231 return self 

232 

233 def transform(self, record: Record) -> Optional[Record]: 

234 """Transform a record. 

235  

236 Args: 

237 record: Source record 

238  

239 Returns: 

240 Transformed record or None if filtered out 

241 """ 

242 # Apply record filters 

243 for filter_func in self.record_filters: 

244 if not filter_func(record): 

245 return None 

246 

247 # Create new record 

248 new_record = Record() 

249 

250 # Apply field mappings 

251 if self.field_mappings: 

252 for mapping in self.field_mappings: 

253 if mapping.source_field in record.fields: 

254 source_field = record.fields[mapping.source_field] 

255 value = mapping.apply(source_field.value) 

256 

257 new_record.fields[mapping.target_field] = Field( 

258 name=mapping.target_field, 

259 value=value, 

260 type=type(value).__name__ if value is not None else 'str', 

261 metadata=source_field.metadata.copy() if source_field.metadata else {} 

262 ) 

263 elif mapping.default_value is not None: 

264 new_record.fields[mapping.target_field] = Field( 

265 name=mapping.target_field, 

266 value=mapping.default_value, 

267 type=type(mapping.default_value).__name__ 

268 ) 

269 else: 

270 # No explicit mappings, copy all fields 

271 for field_name, field in record.fields.items(): 

272 if field_name not in self.field_filters: 

273 new_record.fields[field_name] = field.copy() 

274 

275 # Exclude filtered fields 

276 for field_name in self.field_filters: 

277 if field_name in new_record.fields: 

278 del new_record.fields[field_name] 

279 

280 # Copy metadata 

281 new_record.metadata = record.metadata.copy() if record.metadata else {} 

282 

283 # Apply record transformers 

284 for transformer in self.record_transformers: 

285 new_record = transformer(new_record) 

286 if new_record is None: 

287 return None 

288 

289 return new_record 

290 

291 

292class TransformationPipeline: 

293 """Chain multiple data transformers.""" 

294 

295 def __init__(self, *transformers: Union[DataTransformer, Callable[[Record], Optional[Record]]]): 

296 """Initialize transformation pipeline. 

297  

298 Args: 

299 *transformers: Transformers to chain 

300 """ 

301 self.transformers = list(transformers) 

302 

303 def add(self, transformer: Union[DataTransformer, Callable[[Record], Optional[Record]]]) -> "TransformationPipeline": 

304 """Add a transformer to the pipeline. 

305  

306 Args: 

307 transformer: Transformer to add 

308  

309 Returns: 

310 Self for chaining 

311 """ 

312 self.transformers.append(transformer) 

313 return self 

314 

315 def transform(self, record: Record) -> Optional[Record]: 

316 """Apply all transformations in sequence. 

317  

318 Args: 

319 record: Source record 

320  

321 Returns: 

322 Transformed record or None if filtered out 

323 """ 

324 current = record 

325 

326 for transformer in self.transformers: 

327 if isinstance(transformer, DataTransformer): 

328 current = transformer.transform(current) 

329 else: 

330 current = transformer(current) 

331 

332 if current is None: 

333 return None 

334 

335 return current 

336 

337 def __call__(self, record: Record) -> Optional[Record]: 

338 """Make pipeline callable. 

339  

340 Args: 

341 record: Source record 

342  

343 Returns: 

344 Transformed record or None if filtered out 

345 """ 

346 return self.transform(record)