Coverage for src/csv_schema_validator/core/validator.py: 90%

30 statements  

« prev     ^ index     » next       coverage.py v7.10.6, created at 2025-12-23 15:34 +0100

1""" 

2Schema validation module for CSV schemas. 

3""" 

4from __future__ import annotations 

5 

6from typing import Any 

7from pydantic import ValidationError 

8 

9from .models import CSVSchema 

10from .types import SchemaDict, ValidationResult 

11 

12 

13class SchemaValidationError(Exception): 

14 """Raised when schema validation fails.""" 

15 

16 def __init__(self, message: str, field_path: str | None = None, 

17 details: dict[str, Any] | None = None): 

18 super().__init__(message) 

19 self.message = message 

20 self.field_path = field_path 

21 self.details = details or {} 

22 

23 def to_dict(self) -> dict[str, Any]: 

24 """Convert exception to dictionary format for JSON serialization.""" 

25 return { 

26 "error_type": self.__class__.__name__, 

27 "error_message": self.message, 

28 "row": None, 

29 "column": None, 

30 "value": None, 

31 "details": self.details 

32 } 

33 

34 

35def validate_schema_structure(schema_dict: SchemaDict) -> ValidationResult: 

36 """ 

37 Validate the structure of a CSV schema. 

38 

39 Args: 

40 schema_dict: Dictionary containing the schema definition 

41 

42 Returns: 

43 Dictionary with validation results: 

44 - is_valid: Boolean indicating if schema is valid 

45 - errors: List of validation errors 

46 - validated_schema: Pydantic model instance if valid 

47 """ 

48 try: 

49 validated_schema = CSVSchema(**schema_dict) 

50 return {"is_valid": True, "errors": [], "validated_schema": validated_schema} 

51 except ValidationError as e: 

52 errors = [] 

53 for error in e.errors(): 

54 field_path = ".".join(str(x) for x in error["loc"]) 

55 schema_error = SchemaValidationError( 

56 message=error["msg"], 

57 field_path=field_path, 

58 details={"input": error["input"], "type": error["type"]} 

59 ) 

60 errors.append(schema_error.to_dict()) 

61 

62 return {"is_valid": False, "errors": errors, "validated_schema": None} 

63 except TypeError as e: 

64 schema_error = SchemaValidationError( 

65 message=str(e), 

66 details={"schema_dict": schema_dict} 

67 ) 

68 return { 

69 "is_valid": False, 

70 "errors": [schema_error.to_dict()], 

71 "validated_schema": None, 

72 } 

73 except Exception as e: 

74 schema_error = SchemaValidationError( 

75 message=f"Unexpected error during schema validation: {str(e)}", 

76 details={"schema_dict": schema_dict} 

77 ) 

78 return { 

79 "is_valid": False, 

80 "errors": [schema_error.to_dict()], 

81 "validated_schema": None, 

82 }