Coverage for src/csv_schema_validator/validate_csv.py: 73%
45 statements
« prev ^ index » next coverage.py v7.10.6, created at 2025-12-23 15:07 +0100
« prev ^ index » next coverage.py v7.10.6, created at 2025-12-23 15:07 +0100
1"""
2CSV validation module using custom exceptions.
3"""
4from __future__ import annotations
6import csv
7from pathlib import Path
9from .exceptions import (
10 CSVFileError,
11 EmptyFileError,
12 ValidationConfigurationError,
13)
14from .schema_validator import validate_schema_structure
15from .types import HeaderData, SchemaDict, ValidationResult
16from .validator import FieldValidator
19def validate_csv(csv_file: str | Path, schema: SchemaDict) -> ValidationResult:
20 """
21 Validate a CSV file against a JSON schema.
23 Args:
24 csv_file: The path to the CSV file to validate.
25 schema: The JSON schema to validate the CSV file against.
27 Returns:
28 A dictionary containing the validation results:
29 - is_valid: A boolean indicating if the CSV file is valid.
30 - errors: A list of errors found in the CSV file.
31 Each error is a dictionary containing the following keys:
32 - error_type: The type of error.
33 - error_message: Human-readable error message.
34 - row: Row number where error occurred (-1 for header errors).
35 - column: Column name where error occurred.
36 - value: The value that caused the error.
37 - details: Additional error details.
38 """
39 csv_path = Path(csv_file)
41 # Validate schema first
42 schema_validation_result = validate_schema_structure(schema)
43 if not schema_validation_result["is_valid"]:
44 return schema_validation_result
46 # Extract required field names
47 try:
48 required_field_names = [
49 field["name"] for field in schema["fields"] if field["required"]
50 ]
51 except (KeyError, TypeError) as e:
52 error = ValidationConfigurationError(
53 message=f"Invalid schema structure: {str(e)}",
54 details={"schema": schema}
55 )
56 return {"is_valid": False, "errors": [error.to_dict()]}
58 all_errors = []
60 try:
61 with open(csv_path, "r", encoding="utf-8") as file:
62 reader = csv.reader(file)
63 try:
64 header: HeaderData = next(reader)
65 except StopIteration:
66 error = EmptyFileError(str(csv_path), "CSV")
67 return {"is_valid": False, "errors": [error.to_dict()]}
69 # Validate required fields are present
70 header_result = FieldValidator.validate_required_fields(
71 header, required_field_names
72 )
73 if not header_result["is_valid"]:
74 all_errors.extend(header_result["errors"])
76 # Validate each row
77 for row_number, row in enumerate(reader, start=2): # Start at 2 (header is row 1)
78 row_errors = FieldValidator.validate_row(
79 row, header, schema, row_number
80 )
81 all_errors.extend(row_errors)
83 except FileNotFoundError:
84 error = CSVFileError(
85 message=f"CSV file not found: {csv_path}",
86 file_path=str(csv_path)
87 )
88 return {"is_valid": False, "errors": [error.to_dict()]}
89 except PermissionError:
90 error = CSVFileError(
91 message=f"Permission denied reading CSV file: {csv_path}",
92 file_path=str(csv_path)
93 )
94 return {"is_valid": False, "errors": [error.to_dict()]}
95 except UnicodeDecodeError as e:
96 error = CSVFileError(
97 message=f"Unable to decode CSV file: {str(e)}",
98 file_path=str(csv_path),
99 details={"encoding_error": str(e)}
100 )
101 return {"is_valid": False, "errors": [error.to_dict()]}
102 except Exception as e:
103 error = CSVFileError(
104 message=f"Unexpected error reading CSV file: {str(e)}",
105 file_path=str(csv_path),
106 details={"error_type": type(e).__name__}
107 )
108 return {"is_valid": False, "errors": [error.to_dict()]}
110 return {
111 "is_valid": len(all_errors) == 0,
112 "errors": all_errors,
113 }