Coverage for src/csv_schema_validator/core/validator.py: 90%
30 statements
« prev ^ index » next coverage.py v7.10.6, created at 2025-12-23 15:34 +0100
« prev ^ index » next coverage.py v7.10.6, created at 2025-12-23 15:34 +0100
1"""
2Schema validation module for CSV schemas.
3"""
4from __future__ import annotations
6from typing import Any
7from pydantic import ValidationError
9from .models import CSVSchema
10from .types import SchemaDict, ValidationResult
13class SchemaValidationError(Exception):
14 """Raised when schema validation fails."""
16 def __init__(self, message: str, field_path: str | None = None,
17 details: dict[str, Any] | None = None):
18 super().__init__(message)
19 self.message = message
20 self.field_path = field_path
21 self.details = details or {}
23 def to_dict(self) -> dict[str, Any]:
24 """Convert exception to dictionary format for JSON serialization."""
25 return {
26 "error_type": self.__class__.__name__,
27 "error_message": self.message,
28 "row": None,
29 "column": None,
30 "value": None,
31 "details": self.details
32 }
35def validate_schema_structure(schema_dict: SchemaDict) -> ValidationResult:
36 """
37 Validate the structure of a CSV schema.
39 Args:
40 schema_dict: Dictionary containing the schema definition
42 Returns:
43 Dictionary with validation results:
44 - is_valid: Boolean indicating if schema is valid
45 - errors: List of validation errors
46 - validated_schema: Pydantic model instance if valid
47 """
48 try:
49 validated_schema = CSVSchema(**schema_dict)
50 return {"is_valid": True, "errors": [], "validated_schema": validated_schema}
51 except ValidationError as e:
52 errors = []
53 for error in e.errors():
54 field_path = ".".join(str(x) for x in error["loc"])
55 schema_error = SchemaValidationError(
56 message=error["msg"],
57 field_path=field_path,
58 details={"input": error["input"], "type": error["type"]}
59 )
60 errors.append(schema_error.to_dict())
62 return {"is_valid": False, "errors": errors, "validated_schema": None}
63 except TypeError as e:
64 schema_error = SchemaValidationError(
65 message=str(e),
66 details={"schema_dict": schema_dict}
67 )
68 return {
69 "is_valid": False,
70 "errors": [schema_error.to_dict()],
71 "validated_schema": None,
72 }
73 except Exception as e:
74 schema_error = SchemaValidationError(
75 message=f"Unexpected error during schema validation: {str(e)}",
76 details={"schema_dict": schema_dict}
77 )
78 return {
79 "is_valid": False,
80 "errors": [schema_error.to_dict()],
81 "validated_schema": None,
82 }