Coverage for src/dataknobs_data/validation_old_backup/schema.py: 0%

179 statements  

« prev     ^ index     » next       coverage.py v7.10.3, created at 2025-08-15 12:32 -0500

1"""Schema definition and validation.""" 

2 

3import logging 

4from dataclasses import dataclass, field 

5from datetime import datetime 

6from typing import Any, Callable, Dict, List, Optional, Set, Type, Union 

7 

8from dataknobs_data.fields import Field, FieldType 

9from dataknobs_data.records import Record 

10from .constraints import Constraint 

11from .type_coercion import TypeCoercer 

12 

13logger = logging.getLogger(__name__) 

14 

15 

16@dataclass 

17class ValidationError: 

18 """Represents a validation error.""" 

19 field_name: str 

20 error_type: str 

21 message: str 

22 value: Any = None 

23 

24 def __str__(self) -> str: 

25 if self.value is not None: 

26 return f"{self.field_name}: {self.message} (value: {self.value})" 

27 return f"{self.field_name}: {self.message}" 

28 

29 

30@dataclass 

31class ValidationResult: 

32 """Result of schema validation.""" 

33 is_valid: bool 

34 errors: List[ValidationError] = field(default_factory=list) 

35 warnings: List[str] = field(default_factory=list) 

36 

37 def add_error(self, field_name: str, error_type: str, message: str, value: Any = None) -> None: 

38 """Add a validation error.""" 

39 self.errors.append(ValidationError(field_name, error_type, message, value)) 

40 self.is_valid = False 

41 

42 def add_warning(self, message: str) -> None: 

43 """Add a validation warning.""" 

44 self.warnings.append(message) 

45 

46 def merge(self, other: "ValidationResult") -> None: 

47 """Merge another validation result into this one.""" 

48 self.errors.extend(other.errors) 

49 self.warnings.extend(other.warnings) 

50 self.is_valid = self.is_valid and other.is_valid 

51 

52 def __str__(self) -> str: 

53 if self.is_valid: 

54 return "Validation successful" 

55 error_messages = [str(error) for error in self.errors] 

56 return f"Validation failed with {len(self.errors)} error(s):\n" + "\n".join(error_messages) 

57 

58 

59@dataclass 

60class FieldDefinition: 

61 """Definition of a field in a schema.""" 

62 name: str 

63 type: Union[Type, FieldType, str] 

64 required: bool = False 

65 default: Any = None 

66 constraints: List[Constraint] = field(default_factory=list) 

67 description: str = "" 

68 metadata: Dict[str, Any] = field(default_factory=dict) 

69 custom_validator: Optional[Callable[[Any], bool]] = None 

70 

71 def validate(self, value: Any, coerce: bool = False) -> ValidationResult: 

72 """Validate a value against this field definition.""" 

73 result = ValidationResult(is_valid=True) 

74 

75 # Check required 

76 if self.required and (value is None or value == ""): 

77 result.add_error(self.name, "required", "Field is required", value) 

78 return result 

79 

80 # Skip further validation if value is None and not required 

81 if value is None and not self.required: 

82 return result 

83 

84 # Type coercion if requested 

85 if coerce: 

86 try: 

87 coercer = TypeCoercer() 

88 value = coercer.coerce(value, self.type) 

89 except Exception as e: 

90 result.add_error(self.name, "type", f"Type coercion failed: {e}", value) 

91 return result 

92 

93 # Type validation 

94 if not self._validate_type(value): 

95 expected_type = self.type.__name__ if hasattr(self.type, '__name__') else str(self.type) 

96 actual_type = type(value).__name__ 

97 result.add_error( 

98 self.name, 

99 "type", 

100 f"Expected type {expected_type}, got {actual_type}", 

101 value 

102 ) 

103 return result 

104 

105 # Apply constraints 

106 for constraint in self.constraints: 

107 if not constraint.validate(value): 

108 result.add_error( 

109 self.name, 

110 constraint.name, 

111 constraint.get_error_message(value), 

112 value 

113 ) 

114 

115 # Custom validation 

116 if self.custom_validator: 

117 try: 

118 if not self.custom_validator(value): 

119 result.add_error(self.name, "custom", "Custom validation failed", value) 

120 except Exception as e: 

121 result.add_error(self.name, "custom", f"Custom validation error: {e}", value) 

122 

123 return result 

124 

125 def _validate_type(self, value: Any) -> bool: 

126 """Check if value matches the expected type.""" 

127 if isinstance(self.type, type): 

128 return isinstance(value, self.type) 

129 elif isinstance(self.type, FieldType): 

130 # Map FieldType to Python types 

131 type_map = { 

132 FieldType.STRING: str, 

133 FieldType.INTEGER: int, 

134 FieldType.FLOAT: (int, float), 

135 FieldType.BOOLEAN: bool, 

136 FieldType.DATETIME: (datetime, str), # Allow string for datetime 

137 FieldType.JSON: (list, dict), # JSON can be list or dict 

138 FieldType.TEXT: str, 

139 FieldType.BINARY: bytes, 

140 } 

141 expected = type_map.get(self.type, object) 

142 return isinstance(value, expected) 

143 elif isinstance(self.type, str): 

144 # String type name 

145 type_map = { 

146 'str': str, 

147 'int': int, 

148 'float': (int, float), 

149 'bool': bool, 

150 'list': list, 

151 'dict': dict, 

152 'datetime': (datetime, str), 

153 } 

154 expected = type_map.get(self.type.lower(), object) 

155 return isinstance(value, expected) 

156 

157 return True # Unknown type, allow anything 

158 

159 

160class Schema: 

161 """Define and validate record schemas.""" 

162 

163 def __init__( 

164 self, 

165 fields: Optional[Dict[str, FieldDefinition]] = None, 

166 name: str = "", 

167 version: str = "1.0.0", 

168 strict: bool = False 

169 ): 

170 """Initialize schema. 

171  

172 Args: 

173 fields: Field definitions 

174 name: Schema name 

175 version: Schema version 

176 strict: If True, reject records with extra fields 

177 """ 

178 self.fields = fields or {} 

179 self.name = name 

180 self.version = version 

181 self.strict = strict 

182 self._field_index: Dict[str, FieldDefinition] = {} 

183 self._build_index() 

184 

185 def _build_index(self) -> None: 

186 """Build field index for faster lookups.""" 

187 self._field_index = {field.name: field for field in self.fields.values()} 

188 

189 def add_field(self, field_def: FieldDefinition) -> None: 

190 """Add a field definition to the schema.""" 

191 self.fields[field_def.name] = field_def 

192 self._field_index[field_def.name] = field_def 

193 

194 def remove_field(self, field_name: str) -> None: 

195 """Remove a field from the schema.""" 

196 if field_name in self.fields: 

197 del self.fields[field_name] 

198 del self._field_index[field_name] 

199 

200 def validate(self, record: Record, coerce: bool = False) -> ValidationResult: 

201 """Validate a record against this schema. 

202  

203 Args: 

204 record: Record to validate 

205 coerce: Whether to attempt type coercion 

206  

207 Returns: 

208 ValidationResult with validation status and errors 

209 """ 

210 result = ValidationResult(is_valid=True) 

211 

212 # Check for extra fields in strict mode 

213 if self.strict: 

214 record_fields = set(record.fields.keys()) 

215 schema_fields = set(self._field_index.keys()) 

216 extra_fields = record_fields - schema_fields 

217 

218 for field_name in extra_fields: 

219 result.add_error(field_name, "extra_field", "Field not defined in schema") 

220 

221 # Validate each field in schema 

222 for field_name, field_def in self._field_index.items(): 

223 if field_name in record.fields: 

224 field_value = record.fields[field_name].value 

225 else: 

226 field_value = None 

227 

228 field_result = field_def.validate(field_value, coerce=coerce) 

229 result.merge(field_result) 

230 

231 return result 

232 

233 def coerce(self, data: Union[Dict[str, Any], Record]) -> Record: 

234 """Coerce raw data to match schema types. 

235  

236 Args: 

237 data: Raw data dictionary or existing record 

238  

239 Returns: 

240 Record with coerced field values 

241 """ 

242 if isinstance(data, Record): 

243 record = data.copy() 

244 else: 

245 record = Record() 

246 for key, value in data.items(): 

247 record.fields[key] = Field(name=key, value=value) 

248 

249 coercer = TypeCoercer() 

250 

251 for field_name, field_def in self._field_index.items(): 

252 if field_name in record.fields: 

253 try: 

254 current_value = record.fields[field_name].value 

255 coerced_value = coercer.coerce(current_value, field_def.type) 

256 record.fields[field_name].value = coerced_value 

257 record.fields[field_name].type = field_def.type 

258 except Exception as e: 

259 logger.warning(f"Failed to coerce {field_name}: {e}") 

260 # Use default if coercion fails 

261 if field_def.default is not None: 

262 record.fields[field_name].value = field_def.default 

263 elif field_def.default is not None: 

264 # Add field with default value 

265 record.fields[field_name] = Field( 

266 name=field_name, 

267 value=field_def.default, 

268 type=field_def.type, 

269 metadata=field_def.metadata.copy() 

270 ) 

271 elif field_def.required: 

272 # Required field is missing 

273 raise ValueError(f"Required field '{field_name}' is missing") 

274 

275 # Add schema version to metadata 

276 if not record.metadata: 

277 record.metadata = {} 

278 record.metadata['schema_name'] = self.name 

279 record.metadata['schema_version'] = self.version 

280 

281 return record 

282 

283 def to_dict(self) -> Dict[str, Any]: 

284 """Convert schema to dictionary.""" 

285 return { 

286 'name': self.name, 

287 'version': self.version, 

288 'strict': self.strict, 

289 'fields': { 

290 name: { 

291 'type': str(field_def.type), 

292 'required': field_def.required, 

293 'default': field_def.default, 

294 'description': field_def.description, 

295 'metadata': field_def.metadata, 

296 'constraints': [c.to_dict() for c in field_def.constraints] 

297 } 

298 for name, field_def in self.fields.items() 

299 } 

300 } 

301 

302 @classmethod 

303 def from_dict(cls, data: Dict[str, Any]) -> "Schema": 

304 """Create schema from dictionary.""" 

305 from .constraints import Constraint 

306 

307 fields = {} 

308 for field_name, field_data in data.get('fields', {}).items(): 

309 field_def = FieldDefinition( 

310 name=field_name, 

311 type=field_data.get('type', 'str'), 

312 required=field_data.get('required', False), 

313 default=field_data.get('default'), 

314 description=field_data.get('description', ''), 

315 metadata=field_data.get('metadata', {}), 

316 constraints=[ 

317 Constraint.from_dict(c) 

318 for c in field_data.get('constraints', []) 

319 ] 

320 ) 

321 fields[field_name] = field_def 

322 

323 return cls( 

324 fields=fields, 

325 name=data.get('name', ''), 

326 version=data.get('version', '1.0.0'), 

327 strict=data.get('strict', False) 

328 ) 

329 

330 

331class SchemaValidator: 

332 """Batch schema validation with caching.""" 

333 

334 def __init__(self, schema: Schema): 

335 """Initialize validator with a schema.""" 

336 self.schema = schema 

337 self._validation_cache: Dict[int, ValidationResult] = {} 

338 

339 def validate_batch( 

340 self, 

341 records: List[Record], 

342 coerce: bool = False, 

343 parallel: bool = False 

344 ) -> List[ValidationResult]: 

345 """Validate multiple records. 

346  

347 Args: 

348 records: Records to validate 

349 coerce: Whether to attempt type coercion 

350 parallel: Whether to validate in parallel 

351  

352 Returns: 

353 List of validation results 

354 """ 

355 if parallel: 

356 import concurrent.futures 

357 with concurrent.futures.ThreadPoolExecutor() as executor: 

358 futures = [ 

359 executor.submit(self.schema.validate, record, coerce) 

360 for record in records 

361 ] 

362 results = [future.result() for future in futures] 

363 else: 

364 results = [self.schema.validate(record, coerce) for record in records] 

365 

366 return results 

367 

368 def validate_with_cache(self, record: Record, coerce: bool = False) -> ValidationResult: 

369 """Validate with caching based on record hash.""" 

370 record_hash = hash(str(record.to_dict())) 

371 

372 if record_hash in self._validation_cache: 

373 return self._validation_cache[record_hash] 

374 

375 result = self.schema.validate(record, coerce) 

376 self._validation_cache[record_hash] = result 

377 

378 return result 

379 

380 def clear_cache(self) -> None: 

381 """Clear validation cache.""" 

382 self._validation_cache.clear()