Coverage for src/csv_schema_validator/field_validators/validator.py: 94%

102 statements  

« prev     ^ index     » next       coverage.py v7.10.6, created at 2025-12-23 15:34 +0100

1""" 

2Field validation module for CSV field validation. 

3 

4This module provides comprehensive field validation functionality including 

5type validation, pattern matching, enum validation, range validation, and 

6required field checking. 

7""" 

8from __future__ import annotations 

9 

10import re 

11from typing import Any 

12 

13from ..core import BOOLEAN_VALUES 

14 

15from .exceptions import ( 

16 EnumValidationError, 

17 PatternValidationError, 

18 RangeValidationError, 

19 RequiredFieldError, 

20 TypeValidationError, 

21 ValidationConfigurationError, 

22) 

23 

24 

25class FieldValidator: 

26 """Validates individual fields and rows against schema definitions.""" 

27 

28 @staticmethod 

29 def validate_row( 

30 row: list[str], header: list[str], schema: dict[str, Any], row_number: int 

31 ) -> list[dict[str, Any]]: 

32 """ 

33 Validate a single row against the schema. 

34  

35 Args: 

36 row: List of field values 

37 header: List of column names 

38 schema: Schema definition dictionary 

39 row_number: Row number for error reporting 

40  

41 Returns: 

42 List of error dictionaries 

43 """ 

44 fields_schema_dict = FieldValidator.dict_array_to_dict(schema["fields"], "name") 

45 errors = [] 

46 

47 for i in range(len(row)): 

48 if i < len(header) and header[i] in fields_schema_dict: 

49 field_errors = FieldValidator.is_field_valid( 

50 row[i], fields_schema_dict[header[i]], row_number, header[i] 

51 ) 

52 errors.extend(field_errors) 

53 

54 return errors 

55 

56 @staticmethod 

57 def is_field_valid( 

58 field: str, field_schema: dict[str, Any], row_number: int, column: str 

59 ) -> list[dict[str, Any]]: 

60 """ 

61 Validate a single field against its schema definition. 

62  

63 Args: 

64 field: Field value to validate 

65 field_schema: Schema definition for this field 

66 row_number: Row number for error reporting 

67 column: Column name for error reporting 

68  

69 Returns: 

70 List of error dictionaries 

71 """ 

72 errors = [] 

73 

74 # Type validation (must be done first) 

75 type_error = FieldValidator.is_type_valid(field, field_schema["type"], row_number, column) 

76 if type_error: 

77 errors.append(type_error) 

78 # If type is invalid, don't validate other constraints 

79 return errors 

80 

81 # Additional validations only if type is valid 

82 if "enum" in field_schema: 

83 enum_error = FieldValidator.validate_enum(field, field_schema["enum"], row_number, column) 

84 if enum_error: 

85 errors.append(enum_error) 

86 

87 if "pattern" in field_schema: 

88 pattern_error = FieldValidator.validate_pattern(field, field_schema["pattern"], row_number, column) 

89 if pattern_error: 

90 errors.append(pattern_error) 

91 

92 if "min" in field_schema or "max" in field_schema: 

93 range_error = FieldValidator.validate_range( 

94 field, field_schema.get("min"), field_schema.get("max"), row_number, column 

95 ) 

96 if range_error: 

97 errors.append(range_error) 

98 

99 return errors 

100 

101 @staticmethod 

102 def is_type_valid(field: str, field_type: str, row_number: int, column: str) -> dict[str, Any] | None: 

103 """ 

104 Validate field type. 

105  

106 Args: 

107 field: Field value to validate 

108 field_type: Expected type 

109 row_number: Row number for error reporting 

110 column: Column name for error reporting 

111  

112 Returns: 

113 Error dictionary if validation fails, None if valid 

114 """ 

115 supported_types = frozenset({"string", "number", "integer", "boolean"}) 

116 

117 if field_type not in supported_types: 

118 error = ValidationConfigurationError( 

119 message=f"Unsupported field type: {field_type}", 

120 details={"supported_types": list(supported_types)} 

121 ) 

122 return error.to_dict() 

123 

124 if field_type == "string": 

125 return None 

126 

127 if field_type == "number": 

128 try: 

129 float(field) 

130 return None 

131 except ValueError: 

132 error = TypeValidationError(column, row_number, field, "number") 

133 return error.to_dict() 

134 

135 if field_type == "integer": 

136 try: 

137 int(field) 

138 return None 

139 except ValueError: 

140 error = TypeValidationError(column, row_number, field, "integer") 

141 return error.to_dict() 

142 

143 if field_type == "boolean": 

144 if field.lower() in BOOLEAN_VALUES: 

145 return None 

146 else: 

147 error = TypeValidationError(column, row_number, field, "boolean") 

148 error.details = {"supported_values": BOOLEAN_VALUES} 

149 return error.to_dict() 

150 

151 return None 

152 

153 @staticmethod 

154 def validate_pattern(field: str, pattern: str, row_number: int, column: str) -> dict[str, Any] | None: 

155 """ 

156 Validate field against regex pattern. 

157  

158 Args: 

159 field: Field value to validate 

160 pattern: Regex pattern to match 

161 row_number: Row number for error reporting 

162 column: Column name for error reporting 

163  

164 Returns: 

165 Error dictionary if validation fails, None if valid 

166 """ 

167 if not re.match(pattern, field): 

168 error = PatternValidationError(column, row_number, field, pattern) 

169 return error.to_dict() 

170 return None 

171 

172 @staticmethod 

173 def validate_enum(field: str, allowed_values: list[str], row_number: int, column: str) -> dict[str, Any] | None: 

174 """ 

175 Validate field against enum values. 

176  

177 Args: 

178 field: Field value to validate 

179 allowed_values: List of allowed values 

180 row_number: Row number for error reporting 

181 column: Column name for error reporting 

182  

183 Returns: 

184 Error dictionary if validation fails, None if valid 

185 """ 

186 if field not in allowed_values: 

187 error = EnumValidationError(column, row_number, field, allowed_values) 

188 return error.to_dict() 

189 return None 

190 

191 @staticmethod 

192 def validate_range(field: str, min_value: float | None, max_value: float | None, 

193 row_number: int, column: str) -> dict[str, Any] | None: 

194 """ 

195 Validate field against min/max range. 

196  

197 Args: 

198 field: Field value to validate 

199 min_value: Minimum allowed value 

200 max_value: Maximum allowed value 

201 row_number: Row number for error reporting 

202 column: Column name for error reporting 

203  

204 Returns: 

205 Error dictionary if validation fails, None if valid 

206 """ 

207 try: 

208 numeric_value = float(field) 

209 except ValueError: 

210 # Type validation should have caught this, but just in case 

211 error = TypeValidationError(column, row_number, field, "number") 

212 return error.to_dict() 

213 

214 if min_value is not None and numeric_value < min_value: 

215 error = RangeValidationError(column, row_number, field, min_value=min_value) 

216 return error.to_dict() 

217 

218 if max_value is not None and numeric_value > max_value: 

219 error = RangeValidationError(column, row_number, field, max_value=max_value) 

220 return error.to_dict() 

221 

222 return None 

223 

224 @staticmethod 

225 def dict_array_to_dict(array: list[dict[str, Any]], by_key: str) -> dict[str, dict[str, Any]]: 

226 """ 

227 Convert list of dictionaries to dictionary keyed by specified field. 

228  

229 Args: 

230 array: List of dictionaries 

231 by_key: Key to use for dictionary keys 

232  

233 Returns: 

234 Dictionary keyed by the specified field 

235 """ 

236 return {item[by_key]: item for item in array} 

237 

238 @staticmethod 

239 def validate_required_fields(header: list[str], required_fields: list[str]) -> dict[str, Any]: 

240 """ 

241 Validate that all required fields are present in the header. 

242  

243 Args: 

244 header: List of column names in CSV 

245 required_fields: List of required field names 

246  

247 Returns: 

248 Dictionary with validation results 

249 """ 

250 missing_fields = set(required_fields) - set(header) 

251 

252 if missing_fields: 

253 error = RequiredFieldError("", -1) # Header-level error 

254 error.message = f"Missing required fields: {', '.join(missing_fields)}" 

255 error.details = { 

256 "required_fields": required_fields, 

257 "missing_fields": list(missing_fields), 

258 "available_fields": header 

259 } 

260 return {"is_valid": False, "errors": [error.to_dict()]} 

261 else: 

262 return {"is_valid": True, "errors": []}