Coverage for src/dataknobs_data/validation_old_backup/schema.py: 0%
179 statements
« prev ^ index » next coverage.py v7.10.3, created at 2025-08-15 12:32 -0500
« prev ^ index » next coverage.py v7.10.3, created at 2025-08-15 12:32 -0500
1"""Schema definition and validation."""
3import logging
4from dataclasses import dataclass, field
5from datetime import datetime
6from typing import Any, Callable, Dict, List, Optional, Set, Type, Union
8from dataknobs_data.fields import Field, FieldType
9from dataknobs_data.records import Record
10from .constraints import Constraint
11from .type_coercion import TypeCoercer
13logger = logging.getLogger(__name__)
16@dataclass
17class ValidationError:
18 """Represents a validation error."""
19 field_name: str
20 error_type: str
21 message: str
22 value: Any = None
24 def __str__(self) -> str:
25 if self.value is not None:
26 return f"{self.field_name}: {self.message} (value: {self.value})"
27 return f"{self.field_name}: {self.message}"
30@dataclass
31class ValidationResult:
32 """Result of schema validation."""
33 is_valid: bool
34 errors: List[ValidationError] = field(default_factory=list)
35 warnings: List[str] = field(default_factory=list)
37 def add_error(self, field_name: str, error_type: str, message: str, value: Any = None) -> None:
38 """Add a validation error."""
39 self.errors.append(ValidationError(field_name, error_type, message, value))
40 self.is_valid = False
42 def add_warning(self, message: str) -> None:
43 """Add a validation warning."""
44 self.warnings.append(message)
46 def merge(self, other: "ValidationResult") -> None:
47 """Merge another validation result into this one."""
48 self.errors.extend(other.errors)
49 self.warnings.extend(other.warnings)
50 self.is_valid = self.is_valid and other.is_valid
52 def __str__(self) -> str:
53 if self.is_valid:
54 return "Validation successful"
55 error_messages = [str(error) for error in self.errors]
56 return f"Validation failed with {len(self.errors)} error(s):\n" + "\n".join(error_messages)
59@dataclass
60class FieldDefinition:
61 """Definition of a field in a schema."""
62 name: str
63 type: Union[Type, FieldType, str]
64 required: bool = False
65 default: Any = None
66 constraints: List[Constraint] = field(default_factory=list)
67 description: str = ""
68 metadata: Dict[str, Any] = field(default_factory=dict)
69 custom_validator: Optional[Callable[[Any], bool]] = None
71 def validate(self, value: Any, coerce: bool = False) -> ValidationResult:
72 """Validate a value against this field definition."""
73 result = ValidationResult(is_valid=True)
75 # Check required
76 if self.required and (value is None or value == ""):
77 result.add_error(self.name, "required", "Field is required", value)
78 return result
80 # Skip further validation if value is None and not required
81 if value is None and not self.required:
82 return result
84 # Type coercion if requested
85 if coerce:
86 try:
87 coercer = TypeCoercer()
88 value = coercer.coerce(value, self.type)
89 except Exception as e:
90 result.add_error(self.name, "type", f"Type coercion failed: {e}", value)
91 return result
93 # Type validation
94 if not self._validate_type(value):
95 expected_type = self.type.__name__ if hasattr(self.type, '__name__') else str(self.type)
96 actual_type = type(value).__name__
97 result.add_error(
98 self.name,
99 "type",
100 f"Expected type {expected_type}, got {actual_type}",
101 value
102 )
103 return result
105 # Apply constraints
106 for constraint in self.constraints:
107 if not constraint.validate(value):
108 result.add_error(
109 self.name,
110 constraint.name,
111 constraint.get_error_message(value),
112 value
113 )
115 # Custom validation
116 if self.custom_validator:
117 try:
118 if not self.custom_validator(value):
119 result.add_error(self.name, "custom", "Custom validation failed", value)
120 except Exception as e:
121 result.add_error(self.name, "custom", f"Custom validation error: {e}", value)
123 return result
125 def _validate_type(self, value: Any) -> bool:
126 """Check if value matches the expected type."""
127 if isinstance(self.type, type):
128 return isinstance(value, self.type)
129 elif isinstance(self.type, FieldType):
130 # Map FieldType to Python types
131 type_map = {
132 FieldType.STRING: str,
133 FieldType.INTEGER: int,
134 FieldType.FLOAT: (int, float),
135 FieldType.BOOLEAN: bool,
136 FieldType.DATETIME: (datetime, str), # Allow string for datetime
137 FieldType.JSON: (list, dict), # JSON can be list or dict
138 FieldType.TEXT: str,
139 FieldType.BINARY: bytes,
140 }
141 expected = type_map.get(self.type, object)
142 return isinstance(value, expected)
143 elif isinstance(self.type, str):
144 # String type name
145 type_map = {
146 'str': str,
147 'int': int,
148 'float': (int, float),
149 'bool': bool,
150 'list': list,
151 'dict': dict,
152 'datetime': (datetime, str),
153 }
154 expected = type_map.get(self.type.lower(), object)
155 return isinstance(value, expected)
157 return True # Unknown type, allow anything
160class Schema:
161 """Define and validate record schemas."""
163 def __init__(
164 self,
165 fields: Optional[Dict[str, FieldDefinition]] = None,
166 name: str = "",
167 version: str = "1.0.0",
168 strict: bool = False
169 ):
170 """Initialize schema.
172 Args:
173 fields: Field definitions
174 name: Schema name
175 version: Schema version
176 strict: If True, reject records with extra fields
177 """
178 self.fields = fields or {}
179 self.name = name
180 self.version = version
181 self.strict = strict
182 self._field_index: Dict[str, FieldDefinition] = {}
183 self._build_index()
185 def _build_index(self) -> None:
186 """Build field index for faster lookups."""
187 self._field_index = {field.name: field for field in self.fields.values()}
189 def add_field(self, field_def: FieldDefinition) -> None:
190 """Add a field definition to the schema."""
191 self.fields[field_def.name] = field_def
192 self._field_index[field_def.name] = field_def
194 def remove_field(self, field_name: str) -> None:
195 """Remove a field from the schema."""
196 if field_name in self.fields:
197 del self.fields[field_name]
198 del self._field_index[field_name]
200 def validate(self, record: Record, coerce: bool = False) -> ValidationResult:
201 """Validate a record against this schema.
203 Args:
204 record: Record to validate
205 coerce: Whether to attempt type coercion
207 Returns:
208 ValidationResult with validation status and errors
209 """
210 result = ValidationResult(is_valid=True)
212 # Check for extra fields in strict mode
213 if self.strict:
214 record_fields = set(record.fields.keys())
215 schema_fields = set(self._field_index.keys())
216 extra_fields = record_fields - schema_fields
218 for field_name in extra_fields:
219 result.add_error(field_name, "extra_field", "Field not defined in schema")
221 # Validate each field in schema
222 for field_name, field_def in self._field_index.items():
223 if field_name in record.fields:
224 field_value = record.fields[field_name].value
225 else:
226 field_value = None
228 field_result = field_def.validate(field_value, coerce=coerce)
229 result.merge(field_result)
231 return result
233 def coerce(self, data: Union[Dict[str, Any], Record]) -> Record:
234 """Coerce raw data to match schema types.
236 Args:
237 data: Raw data dictionary or existing record
239 Returns:
240 Record with coerced field values
241 """
242 if isinstance(data, Record):
243 record = data.copy()
244 else:
245 record = Record()
246 for key, value in data.items():
247 record.fields[key] = Field(name=key, value=value)
249 coercer = TypeCoercer()
251 for field_name, field_def in self._field_index.items():
252 if field_name in record.fields:
253 try:
254 current_value = record.fields[field_name].value
255 coerced_value = coercer.coerce(current_value, field_def.type)
256 record.fields[field_name].value = coerced_value
257 record.fields[field_name].type = field_def.type
258 except Exception as e:
259 logger.warning(f"Failed to coerce {field_name}: {e}")
260 # Use default if coercion fails
261 if field_def.default is not None:
262 record.fields[field_name].value = field_def.default
263 elif field_def.default is not None:
264 # Add field with default value
265 record.fields[field_name] = Field(
266 name=field_name,
267 value=field_def.default,
268 type=field_def.type,
269 metadata=field_def.metadata.copy()
270 )
271 elif field_def.required:
272 # Required field is missing
273 raise ValueError(f"Required field '{field_name}' is missing")
275 # Add schema version to metadata
276 if not record.metadata:
277 record.metadata = {}
278 record.metadata['schema_name'] = self.name
279 record.metadata['schema_version'] = self.version
281 return record
283 def to_dict(self) -> Dict[str, Any]:
284 """Convert schema to dictionary."""
285 return {
286 'name': self.name,
287 'version': self.version,
288 'strict': self.strict,
289 'fields': {
290 name: {
291 'type': str(field_def.type),
292 'required': field_def.required,
293 'default': field_def.default,
294 'description': field_def.description,
295 'metadata': field_def.metadata,
296 'constraints': [c.to_dict() for c in field_def.constraints]
297 }
298 for name, field_def in self.fields.items()
299 }
300 }
302 @classmethod
303 def from_dict(cls, data: Dict[str, Any]) -> "Schema":
304 """Create schema from dictionary."""
305 from .constraints import Constraint
307 fields = {}
308 for field_name, field_data in data.get('fields', {}).items():
309 field_def = FieldDefinition(
310 name=field_name,
311 type=field_data.get('type', 'str'),
312 required=field_data.get('required', False),
313 default=field_data.get('default'),
314 description=field_data.get('description', ''),
315 metadata=field_data.get('metadata', {}),
316 constraints=[
317 Constraint.from_dict(c)
318 for c in field_data.get('constraints', [])
319 ]
320 )
321 fields[field_name] = field_def
323 return cls(
324 fields=fields,
325 name=data.get('name', ''),
326 version=data.get('version', '1.0.0'),
327 strict=data.get('strict', False)
328 )
331class SchemaValidator:
332 """Batch schema validation with caching."""
334 def __init__(self, schema: Schema):
335 """Initialize validator with a schema."""
336 self.schema = schema
337 self._validation_cache: Dict[int, ValidationResult] = {}
339 def validate_batch(
340 self,
341 records: List[Record],
342 coerce: bool = False,
343 parallel: bool = False
344 ) -> List[ValidationResult]:
345 """Validate multiple records.
347 Args:
348 records: Records to validate
349 coerce: Whether to attempt type coercion
350 parallel: Whether to validate in parallel
352 Returns:
353 List of validation results
354 """
355 if parallel:
356 import concurrent.futures
357 with concurrent.futures.ThreadPoolExecutor() as executor:
358 futures = [
359 executor.submit(self.schema.validate, record, coerce)
360 for record in records
361 ]
362 results = [future.result() for future in futures]
363 else:
364 results = [self.schema.validate(record, coerce) for record in records]
366 return results
368 def validate_with_cache(self, record: Record, coerce: bool = False) -> ValidationResult:
369 """Validate with caching based on record hash."""
370 record_hash = hash(str(record.to_dict()))
372 if record_hash in self._validation_cache:
373 return self._validation_cache[record_hash]
375 result = self.schema.validate(record, coerce)
376 self._validation_cache[record_hash] = result
378 return result
380 def clear_cache(self) -> None:
381 """Clear validation cache."""
382 self._validation_cache.clear()