Coverage for src/dataknobs_data/migration/transformer.py: 44%

84 statements  

« prev     ^ index     » next       coverage.py v7.10.3, created at 2025-08-31 15:06 -0600

1"""Data transformation with fluent API. 

2""" 

3 

4from __future__ import annotations 

5 

6from abc import ABC, abstractmethod 

7from dataclasses import dataclass 

8from typing import Any, TYPE_CHECKING 

9 

10from dataknobs_data.records import Record 

11 

12if TYPE_CHECKING: 

13 from collections.abc import Callable 

14 from dataknobs_data.fields import FieldType 

15 

16 

17class TransformRule(ABC): 

18 """Base class for transformation rules.""" 

19 

20 @abstractmethod 

21 def apply(self, record: Record) -> Record: 

22 """Apply this transformation rule to a record. 

23  

24 Args: 

25 record: Record to transform 

26  

27 Returns: 

28 Transformed record 

29 """ 

30 pass 

31 

32 

33@dataclass 

34class MapRule(TransformRule): 

35 """Map a field to another field, optionally transforming the value.""" 

36 

37 source: str 

38 target: str 

39 transform: Callable[[Any], Any] | None = None 

40 

41 def apply(self, record: Record) -> Record: 

42 """Apply field mapping.""" 

43 result = Record( 

44 data=dict(record.fields), 

45 metadata=record.metadata.copy(), 

46 id=record.id 

47 ) 

48 

49 if self.source in record.fields: 

50 value = record.fields[self.source].value 

51 

52 # Apply transformation if provided 

53 if self.transform: 

54 try: 

55 value = self.transform(value) 

56 except Exception as e: 

57 # Store error in metadata and keep original value 

58 result.metadata[f"_transform_error_{self.source}"] = str(e) 

59 value = record.fields[self.source].value 

60 

61 # If target is different from source, remove source field 

62 if self.target != self.source: 

63 del result.fields[self.source] 

64 

65 # Set target field 

66 result.set_field(self.target, value) 

67 

68 return result 

69 

70 

71@dataclass 

72class ExcludeRule(TransformRule): 

73 """Exclude specified fields from the record.""" 

74 

75 fields: list[str] 

76 

77 def apply(self, record: Record) -> Record: 

78 """Remove excluded fields.""" 

79 result = Record( 

80 data={}, 

81 metadata=record.metadata.copy(), 

82 id=record.id 

83 ) 

84 

85 # Copy all fields except excluded ones 

86 for field_name, field in record.fields.items(): 

87 if field_name not in self.fields: 

88 result.fields[field_name] = field 

89 

90 return result 

91 

92 

93@dataclass 

94class AddRule(TransformRule): 

95 """Add a new field with a computed or default value.""" 

96 

97 field_name: str 

98 value: Any | Callable[[Record], Any] 

99 field_type: FieldType | None = None 

100 

101 def apply(self, record: Record) -> Record: 

102 """Add new field.""" 

103 result = Record( 

104 data=dict(record.fields), 

105 metadata=record.metadata.copy(), 

106 id=record.id 

107 ) 

108 

109 # Compute value if it's a callable 

110 if callable(self.value): 

111 try: 

112 computed_value = self.value(record) 

113 except Exception as e: 

114 # Store error and use None as value 

115 result.metadata[f"_compute_error_{self.field_name}"] = str(e) 

116 computed_value = None 

117 else: 

118 computed_value = self.value 

119 

120 result.set_field(self.field_name, computed_value, field_type=self.field_type) 

121 return result 

122 

123 

124class Transformer: 

125 """Stateless record transformer with fluent API. 

126  

127 Provides a clean, chainable interface for defining record transformations 

128 that can be applied during migrations or data processing. 

129 """ 

130 

131 def __init__(self): 

132 """Initialize transformer with empty rule set.""" 

133 self.rules: list[TransformRule] = [] 

134 

135 def map( 

136 self, 

137 source: str, 

138 target: str | None = None, 

139 transform: Callable[[Any], Any] | None = None 

140 ) -> Transformer: 

141 """Map a field, optionally transforming its value (fluent API). 

142  

143 Args: 

144 source: Source field name 

145 target: Target field name (defaults to source) 

146 transform: Optional transformation function 

147  

148 Returns: 

149 Self for chaining 

150 """ 

151 self.rules.append(MapRule( 

152 source=source, 

153 target=target or source, 

154 transform=transform 

155 )) 

156 return self 

157 

158 def rename(self, old_name: str, new_name: str) -> Transformer: 

159 """Rename a field (fluent API). 

160  

161 Args: 

162 old_name: Current field name 

163 new_name: New field name 

164  

165 Returns: 

166 Self for chaining 

167 """ 

168 return self.map(old_name, new_name) 

169 

170 def exclude(self, *fields: str) -> Transformer: 

171 """Exclude fields from the record (fluent API). 

172  

173 Args: 

174 *fields: Field names to exclude 

175  

176 Returns: 

177 Self for chaining 

178 """ 

179 self.rules.append(ExcludeRule(list(fields))) 

180 return self 

181 

182 def add( 

183 self, 

184 field_name: str, 

185 value: Any | Callable[[Record], Any], 

186 field_type: FieldType | None = None 

187 ) -> Transformer: 

188 """Add a new field (fluent API). 

189  

190 Args: 

191 field_name: Name of field to add 

192 value: Static value or function to compute value 

193 field_type: Optional field type 

194  

195 Returns: 

196 Self for chaining 

197 """ 

198 self.rules.append(AddRule( 

199 field_name=field_name, 

200 value=value, 

201 field_type=field_type 

202 )) 

203 return self 

204 

205 def add_rule(self, rule: TransformRule) -> Transformer: 

206 """Add a custom transformation rule (fluent API). 

207  

208 Args: 

209 rule: Custom transformation rule 

210  

211 Returns: 

212 Self for chaining 

213 """ 

214 self.rules.append(rule) 

215 return self 

216 

217 def transform(self, record: Record) -> Record | None: 

218 """Apply all transformation rules to a record. 

219  

220 Args: 

221 record: Record to transform 

222  

223 Returns: 

224 Transformed record, or None to filter out the record 

225 """ 

226 result = record 

227 for rule in self.rules: 

228 result = rule.apply(result) 

229 

230 return result 

231 

232 def transform_many(self, records: list[Record]) -> list[Record]: 

233 """Transform multiple records. 

234  

235 Args: 

236 records: List of records to transform 

237  

238 Returns: 

239 List of transformed records (filtered records are excluded) 

240 """ 

241 results = [] 

242 for record in records: 

243 transformed = self.transform(record) 

244 if transformed is not None: 

245 results.append(transformed) 

246 return results 

247 

248 def clear(self) -> Transformer: 

249 """Clear all transformation rules (fluent API). 

250  

251 Returns: 

252 Self for chaining 

253 """ 

254 self.rules.clear() 

255 return self 

256 

257 def __len__(self) -> int: 

258 """Get number of transformation rules.""" 

259 return len(self.rules) 

260 

261 def __repr__(self) -> str: 

262 """String representation.""" 

263 return f"Transformer(rules={len(self.rules)})"