Coverage for src/dataknobs_data/migration_v2/transformer.py: 90%

88 statements  

« prev     ^ index     » next       coverage.py v7.10.3, created at 2025-08-15 12:29 -0500

1""" 

2Data transformation with fluent API. 

3""" 

4 

5from abc import ABC, abstractmethod 

6from dataclasses import dataclass 

7from typing import Any, Callable, List, Optional, Union 

8 

9from dataknobs_data.records import Record 

10from dataknobs_data.fields import FieldType 

11 

12 

13class TransformRule(ABC): 

14 """Base class for transformation rules.""" 

15 

16 @abstractmethod 

17 def apply(self, record: Record) -> Record: 

18 """ 

19 Apply this transformation rule to a record. 

20  

21 Args: 

22 record: Record to transform 

23  

24 Returns: 

25 Transformed record 

26 """ 

27 pass 

28 

29 

30@dataclass 

31class MapRule(TransformRule): 

32 """Map a field to another field, optionally transforming the value.""" 

33 

34 source: str 

35 target: str 

36 transform: Optional[Callable[[Any], Any]] = None 

37 

38 def apply(self, record: Record) -> Record: 

39 """Apply field mapping.""" 

40 result = Record( 

41 data=dict(record.fields), 

42 metadata=record.metadata.copy(), 

43 id=record.id 

44 ) 

45 

46 if self.source in record.fields: 

47 value = record.fields[self.source].value 

48 

49 # Apply transformation if provided 

50 if self.transform: 

51 try: 

52 value = self.transform(value) 

53 except Exception as e: 

54 # Store error in metadata and keep original value 

55 result.metadata[f"_transform_error_{self.source}"] = str(e) 

56 value = record.fields[self.source].value 

57 

58 # If target is different from source, remove source field 

59 if self.target != self.source: 

60 del result.fields[self.source] 

61 

62 # Set target field 

63 result.set_field(self.target, value) 

64 

65 return result 

66 

67 

68@dataclass 

69class ExcludeRule(TransformRule): 

70 """Exclude specified fields from the record.""" 

71 

72 fields: List[str] 

73 

74 def apply(self, record: Record) -> Record: 

75 """Remove excluded fields.""" 

76 result = Record( 

77 data={}, 

78 metadata=record.metadata.copy(), 

79 id=record.id 

80 ) 

81 

82 # Copy all fields except excluded ones 

83 for field_name, field in record.fields.items(): 

84 if field_name not in self.fields: 

85 result.fields[field_name] = field 

86 

87 return result 

88 

89 

90@dataclass 

91class AddRule(TransformRule): 

92 """Add a new field with a computed or default value.""" 

93 

94 field_name: str 

95 value: Union[Any, Callable[[Record], Any]] 

96 field_type: Optional[FieldType] = None 

97 

98 def apply(self, record: Record) -> Record: 

99 """Add new field.""" 

100 result = Record( 

101 data=dict(record.fields), 

102 metadata=record.metadata.copy(), 

103 id=record.id 

104 ) 

105 

106 # Compute value if it's a callable 

107 if callable(self.value): 

108 try: 

109 computed_value = self.value(record) 

110 except Exception as e: 

111 # Store error and use None as value 

112 result.metadata[f"_compute_error_{self.field_name}"] = str(e) 

113 computed_value = None 

114 else: 

115 computed_value = self.value 

116 

117 result.set_field(self.field_name, computed_value, field_type=self.field_type) 

118 return result 

119 

120 

121class Transformer: 

122 """ 

123 Stateless record transformer with fluent API. 

124  

125 Provides a clean, chainable interface for defining record transformations 

126 that can be applied during migrations or data processing. 

127 """ 

128 

129 def __init__(self): 

130 """Initialize transformer with empty rule set.""" 

131 self.rules: List[TransformRule] = [] 

132 

133 def map( 

134 self, 

135 source: str, 

136 target: Optional[str] = None, 

137 transform: Optional[Callable[[Any], Any]] = None 

138 ) -> 'Transformer': 

139 """ 

140 Map a field, optionally transforming its value (fluent API). 

141  

142 Args: 

143 source: Source field name 

144 target: Target field name (defaults to source) 

145 transform: Optional transformation function 

146  

147 Returns: 

148 Self for chaining 

149 """ 

150 self.rules.append(MapRule( 

151 source=source, 

152 target=target or source, 

153 transform=transform 

154 )) 

155 return self 

156 

157 def rename(self, old_name: str, new_name: str) -> 'Transformer': 

158 """ 

159 Rename a field (fluent API). 

160  

161 Args: 

162 old_name: Current field name 

163 new_name: New field name 

164  

165 Returns: 

166 Self for chaining 

167 """ 

168 return self.map(old_name, new_name) 

169 

170 def exclude(self, *fields: str) -> 'Transformer': 

171 """ 

172 Exclude fields from the record (fluent API). 

173  

174 Args: 

175 *fields: Field names to exclude 

176  

177 Returns: 

178 Self for chaining 

179 """ 

180 self.rules.append(ExcludeRule(list(fields))) 

181 return self 

182 

183 def add( 

184 self, 

185 field_name: str, 

186 value: Union[Any, Callable[[Record], Any]], 

187 field_type: Optional[FieldType] = None 

188 ) -> 'Transformer': 

189 """ 

190 Add a new field (fluent API). 

191  

192 Args: 

193 field_name: Name of field to add 

194 value: Static value or function to compute value 

195 field_type: Optional field type 

196  

197 Returns: 

198 Self for chaining 

199 """ 

200 self.rules.append(AddRule( 

201 field_name=field_name, 

202 value=value, 

203 field_type=field_type 

204 )) 

205 return self 

206 

207 def add_rule(self, rule: TransformRule) -> 'Transformer': 

208 """ 

209 Add a custom transformation rule (fluent API). 

210  

211 Args: 

212 rule: Custom transformation rule 

213  

214 Returns: 

215 Self for chaining 

216 """ 

217 self.rules.append(rule) 

218 return self 

219 

220 def transform(self, record: Record) -> Optional[Record]: 

221 """ 

222 Apply all transformation rules to a record. 

223  

224 Args: 

225 record: Record to transform 

226  

227 Returns: 

228 Transformed record, or None if record should be filtered out 

229 """ 

230 if record is None: 

231 return None 

232 

233 result = record 

234 for rule in self.rules: 

235 result = rule.apply(result) 

236 if result is None: 

237 # Rule filtered out the record 

238 return None 

239 

240 return result 

241 

242 def transform_many(self, records: List[Record]) -> List[Record]: 

243 """ 

244 Transform multiple records. 

245  

246 Args: 

247 records: List of records to transform 

248  

249 Returns: 

250 List of transformed records (filtered records excluded) 

251 """ 

252 results = [] 

253 for record in records: 

254 transformed = self.transform(record) 

255 if transformed is not None: 

256 results.append(transformed) 

257 return results 

258 

259 def clear(self) -> 'Transformer': 

260 """ 

261 Clear all transformation rules (fluent API). 

262  

263 Returns: 

264 Self for chaining 

265 """ 

266 self.rules.clear() 

267 return self 

268 

269 def __len__(self) -> int: 

270 """Get number of transformation rules.""" 

271 return len(self.rules) 

272 

273 def __repr__(self) -> str: 

274 """String representation.""" 

275 return f"Transformer(rules={len(self.rules)})"