Coverage for src/dataknobs_fsm/functions/library/validators.py: 0%
225 statements
« prev ^ index » next coverage.py v7.11.0, created at 2025-11-08 14:11 -0700
« prev ^ index » next coverage.py v7.11.0, created at 2025-11-08 14:11 -0700
1"""Built-in validator functions for FSM.
3This module provides commonly used validation functions that can be
4referenced in FSM configurations.
5"""
7import re
8from typing import Any, Dict, List, Union
10from pydantic import BaseModel, ValidationError
12from dataknobs_fsm.functions.base import IValidationFunction, ValidationError as FSMValidationError
15class RequiredFieldsValidator(IValidationFunction):
16 """Validate that required fields are present in data."""
18 def __init__(self, fields: List[str], allow_none: bool = False):
19 """Initialize the validator.
21 Args:
22 fields: List of required field names.
23 allow_none: Whether to allow None values for required fields.
24 """
25 self.fields = fields
26 self.allow_none = allow_none
28 def validate(self, data: Dict[str, Any]) -> bool:
29 """Validate that all required fields are present.
31 Args:
32 data: Data to validate.
34 Returns:
35 True if valid, False otherwise.
37 Raises:
38 FSMValidationError: If validation fails with details.
39 """
40 if not isinstance(data, dict):
41 raise FSMValidationError(f"Expected dict, got {type(data).__name__}")
43 missing_fields = []
44 none_fields = []
46 for field in self.fields:
47 if field not in data:
48 missing_fields.append(field)
49 elif not self.allow_none and data[field] is None:
50 none_fields.append(field)
52 if missing_fields:
53 raise FSMValidationError(
54 f"Missing required fields: {', '.join(missing_fields)}"
55 )
57 if none_fields:
58 raise FSMValidationError(
59 f"Fields cannot be None: {', '.join(none_fields)}"
60 )
62 return True
64 def get_validation_rules(self) -> Dict[str, Any]:
65 """Get the validation rules."""
66 return {
67 "required_fields": self.fields,
68 "allow_none": self.allow_none
69 }
72class SchemaValidator(IValidationFunction):
73 """Validate data against a Pydantic schema."""
75 def __init__(self, schema: Union[type[BaseModel], Dict[str, Any]]):
76 """Initialize the validator.
78 Args:
79 schema: Pydantic model class or schema dictionary.
80 """
81 if isinstance(schema, dict):
82 # Create a dynamic Pydantic model from dictionary
83 from pydantic import create_model
84 self.schema = create_model("DynamicSchema", **schema)
85 else:
86 self.schema = schema
88 def validate(self, data: Dict[str, Any]) -> bool:
89 """Validate data against the schema.
91 Args:
92 data: Data to validate.
94 Returns:
95 True if valid, False otherwise.
97 Raises:
98 FSMValidationError: If validation fails with details.
99 """
100 try:
101 self.schema(**data)
102 return True
103 except ValidationError as e:
104 errors = []
105 for error in e.errors():
106 field_path = ".".join(str(loc) for loc in error["loc"])
107 errors.append(f"{field_path}: {error['msg']}")
109 raise FSMValidationError(
110 f"Schema validation failed: {'; '.join(errors)}"
111 ) from e
113 def get_validation_rules(self) -> Dict[str, Any]:
114 """Get the validation rules."""
115 if hasattr(self.schema, 'model_json_schema'):
116 return self.schema.model_json_schema()
117 elif hasattr(self.schema, '__annotations__'):
118 return dict(self.schema.__annotations__)
119 else:
120 return {"schema": str(self.schema)}
123class RangeValidator(IValidationFunction):
124 """Validate that numeric values are within specified ranges."""
126 def __init__(
127 self,
128 field_ranges: Dict[str, Dict[str, Union[int, float]]],
129 ):
130 """Initialize the validator.
132 Args:
133 field_ranges: Dictionary mapping field names to range specifications.
134 Each range can have 'min', 'max', or both.
135 """
136 self.field_ranges = field_ranges
138 def validate(self, data: Dict[str, Any]) -> bool:
139 """Validate that values are within specified ranges.
141 Args:
142 data: Data to validate.
144 Returns:
145 True if valid, False otherwise.
147 Raises:
148 FSMValidationError: If validation fails with details.
149 """
150 errors = []
152 for field, range_spec in self.field_ranges.items():
153 if field not in data:
154 continue
156 value = data[field]
157 if not isinstance(value, (int, float)):
158 errors.append(f"{field}: Expected numeric value, got {type(value).__name__}")
159 continue
161 if "min" in range_spec and value < range_spec["min"]:
162 errors.append(f"{field}: Value {value} is below minimum {range_spec['min']}")
164 if "max" in range_spec and value > range_spec["max"]:
165 errors.append(f"{field}: Value {value} is above maximum {range_spec['max']}")
167 if errors:
168 raise FSMValidationError("; ".join(errors))
170 return True
172 def get_validation_rules(self) -> Dict[str, Any]:
173 """Get the validation rules."""
174 return {
175 "type": "range",
176 "field_ranges": self.field_ranges
177 }
180class PatternValidator(IValidationFunction):
181 """Validate that string values match specified patterns."""
183 def __init__(
184 self,
185 field_patterns: Dict[str, str],
186 flags: int = 0,
187 ):
188 """Initialize the validator.
190 Args:
191 field_patterns: Dictionary mapping field names to regex patterns.
192 flags: Regex flags to apply (e.g., re.IGNORECASE).
193 """
194 self.field_patterns = {}
195 for field, pattern in field_patterns.items():
196 self.field_patterns[field] = re.compile(pattern, flags)
198 def validate(self, data: Dict[str, Any]) -> bool:
199 """Validate that values match specified patterns.
201 Args:
202 data: Data to validate.
204 Returns:
205 True if valid, False otherwise.
207 Raises:
208 FSMValidationError: If validation fails with details.
209 """
210 errors = []
212 for field, pattern in self.field_patterns.items():
213 if field not in data:
214 continue
216 value = data[field]
217 if not isinstance(value, str):
218 errors.append(f"{field}: Expected string value, got {type(value).__name__}")
219 continue
221 if not pattern.match(value):
222 errors.append(f"{field}: Value '{value}' does not match pattern")
224 if errors:
225 raise FSMValidationError("; ".join(errors))
227 return True
229 def get_validation_rules(self) -> Dict[str, Any]:
230 """Get the validation rules."""
231 return {
232 "type": "pattern",
233 "field_patterns": {field: pattern.pattern for field, pattern in self.field_patterns.items()}
234 }
237class TypeValidator(IValidationFunction):
238 """Validate that fields have expected types."""
240 def __init__(
241 self,
242 field_types: Dict[str, Union[type, List[type]]],
243 strict: bool = False,
244 ):
245 """Initialize the validator.
247 Args:
248 field_types: Dictionary mapping field names to expected types.
249 strict: If True, reject extra fields not in field_types.
250 """
251 self.field_types = field_types
252 self.strict = strict
254 def validate(self, data: Dict[str, Any]) -> bool:
255 """Validate that fields have expected types.
257 Args:
258 data: Data to validate.
260 Returns:
261 True if valid, False otherwise.
263 Raises:
264 FSMValidationError: If validation fails with details.
265 """
266 errors = []
268 # Check field types
269 for field, expected_type in self.field_types.items():
270 if field not in data:
271 continue
273 value = data[field]
274 if isinstance(expected_type, list):
275 # Multiple allowed types
276 if not any(isinstance(value, t) for t in expected_type):
277 type_names = ", ".join(t.__name__ for t in expected_type)
278 errors.append(
279 f"{field}: Expected one of [{type_names}], "
280 f"got {type(value).__name__}"
281 )
282 else:
283 # Single expected type
284 if not isinstance(value, expected_type):
285 errors.append(
286 f"{field}: Expected {expected_type.__name__}, "
287 f"got {type(value).__name__}"
288 )
290 # Check for extra fields if strict mode
291 if self.strict:
292 extra_fields = set(data.keys()) - set(self.field_types.keys())
293 if extra_fields:
294 errors.append(f"Unexpected fields: {', '.join(extra_fields)}")
296 if errors:
297 raise FSMValidationError("; ".join(errors))
299 return True
301 def get_validation_rules(self) -> Dict[str, Any]:
302 """Get the validation rules."""
303 field_type_names = {}
304 for field, ftype in self.field_types.items():
305 if isinstance(ftype, list):
306 field_type_names[field] = [t.__name__ for t in ftype]
307 else:
308 field_type_names[field] = ftype.__name__
309 return {
310 "type": "type_check",
311 "field_types": field_type_names,
312 "strict": self.strict
313 }
316class LengthValidator(IValidationFunction):
317 """Validate that collections have expected lengths."""
319 def __init__(
320 self,
321 field_lengths: Dict[str, Dict[str, int]],
322 ):
323 """Initialize the validator.
325 Args:
326 field_lengths: Dictionary mapping field names to length specifications.
327 Each spec can have 'min', 'max', or 'exact'.
328 """
329 self.field_lengths = field_lengths
331 def validate(self, data: Dict[str, Any]) -> bool:
332 """Validate that collections have expected lengths.
334 Args:
335 data: Data to validate.
337 Returns:
338 True if valid, False otherwise.
340 Raises:
341 FSMValidationError: If validation fails with details.
342 """
343 errors = []
345 for field, length_spec in self.field_lengths.items():
346 if field not in data:
347 continue
349 value = data[field]
350 if not hasattr(value, "__len__"):
351 errors.append(f"{field}: Value does not have a length")
352 continue
354 length = len(value)
356 if "exact" in length_spec and length != length_spec["exact"]:
357 errors.append(
358 f"{field}: Length {length} does not match expected {length_spec['exact']}"
359 )
361 if "min" in length_spec and length < length_spec["min"]:
362 errors.append(f"{field}: Length {length} is below minimum {length_spec['min']}")
364 if "max" in length_spec and length > length_spec["max"]:
365 errors.append(f"{field}: Length {length} is above maximum {length_spec['max']}")
367 if errors:
368 raise FSMValidationError("; ".join(errors))
370 return True
372 def get_validation_rules(self) -> Dict[str, Any]:
373 """Get the validation rules."""
374 return {
375 "type": "length",
376 "field_lengths": self.field_lengths
377 }
380class UniqueValidator(IValidationFunction):
381 """Validate that values in collections are unique."""
383 def __init__(
384 self,
385 fields: List[str],
386 key: str | None = None,
387 ):
388 """Initialize the validator.
390 Args:
391 fields: List of field names to check for uniqueness.
392 key: Optional key to extract from collection items for uniqueness check.
393 """
394 self.fields = fields
395 self.key = key
397 def validate(self, data: Dict[str, Any]) -> bool:
398 """Validate that values are unique.
400 Args:
401 data: Data to validate.
403 Returns:
404 True if valid, False otherwise.
406 Raises:
407 FSMValidationError: If validation fails with details.
408 """
409 errors = []
411 for field in self.fields:
412 if field not in data:
413 continue
415 value = data[field]
416 if not isinstance(value, (list, tuple, set)):
417 errors.append(f"{field}: Expected collection, got {type(value).__name__}")
418 continue
420 if self.key:
421 # Extract values using key
422 try:
423 values = [item[self.key] if isinstance(item, dict) else getattr(item, self.key)
424 for item in value]
425 except (KeyError, AttributeError) as e:
426 errors.append(f"{field}: Cannot extract key '{self.key}': {e}")
427 continue
428 else:
429 values = list(value)
431 # Check for duplicates
432 seen = set()
433 duplicates = set()
434 for v in values:
435 if v in seen:
436 duplicates.add(str(v))
437 seen.add(v)
439 if duplicates:
440 errors.append(f"{field}: Duplicate values found: {', '.join(duplicates)}")
442 if errors:
443 raise FSMValidationError("; ".join(errors))
445 return True
447 def get_validation_rules(self) -> Dict[str, Any]:
448 """Get the validation rules."""
449 return {
450 "type": "unique",
451 "fields": self.fields,
452 "key": self.key
453 }
456class DependencyValidator(IValidationFunction):
457 """Validate field dependencies (if field A exists, field B must also exist)."""
459 def __init__(
460 self,
461 dependencies: Dict[str, Union[str, List[str]]],
462 ):
463 """Initialize the validator.
465 Args:
466 dependencies: Dictionary mapping field names to their dependencies.
467 """
468 self.dependencies = dependencies
470 def validate(self, data: Dict[str, Any]) -> bool:
471 """Validate field dependencies.
473 Args:
474 data: Data to validate.
476 Returns:
477 True if valid, False otherwise.
479 Raises:
480 FSMValidationError: If validation fails with details.
481 """
482 errors = []
484 for field, deps in self.dependencies.items():
485 if field not in data:
486 continue
488 deps_list = deps if isinstance(deps, list) else [deps]
490 missing_deps = [dep for dep in deps_list if dep not in data]
492 if missing_deps:
493 errors.append(
494 f"Field '{field}' requires: {', '.join(missing_deps)}"
495 )
497 if errors:
498 raise FSMValidationError("; ".join(errors))
500 return True
502 def get_validation_rules(self) -> Dict[str, Any]:
503 """Get the validation rules."""
504 return {
505 "type": "dependency",
506 "dependencies": self.dependencies
507 }
510class CompositeValidator(IValidationFunction):
511 """Compose multiple validators into a single validator."""
513 def __init__(
514 self,
515 validators: List[IValidationFunction],
516 stop_on_first_error: bool = False,
517 ):
518 """Initialize the composite validator.
520 Args:
521 validators: List of validators to apply.
522 stop_on_first_error: If True, stop at first validation error.
523 """
524 self.validators = validators
525 self.stop_on_first_error = stop_on_first_error
527 def validate(self, data: Dict[str, Any]) -> bool:
528 """Apply all validators to the data.
530 Args:
531 data: Data to validate.
533 Returns:
534 True if all validators pass.
536 Raises:
537 FSMValidationError: If any validation fails.
538 """
539 errors = []
541 for validator in self.validators:
542 try:
543 validator.validate(data)
544 except FSMValidationError as e:
545 if self.stop_on_first_error:
546 raise
547 errors.append(str(e))
549 if errors:
550 raise FSMValidationError("; ".join(errors))
552 return True
555# Convenience functions for creating validators
556def required_fields(*fields: str, allow_none: bool = False) -> RequiredFieldsValidator:
557 """Create a RequiredFieldsValidator."""
558 return RequiredFieldsValidator(list(fields), allow_none)
561def schema(model: Union[type[BaseModel], Dict[str, Any]]) -> SchemaValidator:
562 """Create a SchemaValidator."""
563 return SchemaValidator(model)
566def range_check(**field_ranges: Dict[str, Union[int, float]]) -> RangeValidator:
567 """Create a RangeValidator."""
568 return RangeValidator(field_ranges)
571def pattern(**field_patterns: str) -> PatternValidator:
572 """Create a PatternValidator."""
573 return PatternValidator(field_patterns)
576def type_check(**field_types: Union[type, List[type]]) -> TypeValidator:
577 """Create a TypeValidator."""
578 return TypeValidator(field_types)
581def length(**field_lengths: Dict[str, int]) -> LengthValidator:
582 """Create a LengthValidator."""
583 return LengthValidator(field_lengths)
586def unique(*fields: str, key: str | None = None) -> UniqueValidator:
587 """Create a UniqueValidator."""
588 return UniqueValidator(list(fields), key)
591def depends_on(**dependencies: Union[str, List[str]]) -> DependencyValidator:
592 """Create a DependencyValidator."""
593 return DependencyValidator(dependencies)