Coverage for src/csv_schema_validator/validator.py: 17%

71 statements  

« prev     ^ index     » next       coverage.py v7.10.6, created at 2025-09-20 12:34 +0200

1import re 

2 

3 

4class FieldValidator: 

5 @staticmethod 

6 def validate_row( 

7 row: list[str], header: list[str], schema: dict, row_number: int 

8 ) -> bool: 

9 fields_schema_dict = FieldValidator.dict_array_to_dict(schema["fields"], "name") 

10 result = [] 

11 

12 for i in range(len(row)): 

13 if header[i] in fields_schema_dict: 

14 result.append( 

15 FieldValidator.is_field_valid( 

16 row[i], fields_schema_dict[header[i]], row_number, header[i] 

17 ) 

18 ) 

19 

20 return result 

21 

22 @staticmethod 

23 def is_field_valid( 

24 field: str, field_schema: dict, row_number: int, column: str 

25 ) -> bool: 

26 checks = ["type", "enum", "pattern", "min", "max"] 

27 

28 result = [] 

29 

30 for check in checks: 

31 if check in field_schema: 

32 if check == "type": 

33 type_result = FieldValidator.is_type_valid( 

34 field, field_schema[check], row_number, column 

35 ) 

36 if type_result: 

37 result.append(type_result) 

38 if check == "enum": 

39 if field not in field_schema[check]: 

40 result.append( 

41 { 

42 "error_type": "not_in_enum", 

43 "value": field, 

44 "column": column, 

45 "row": row_number, 

46 "details": { 

47 "expected_enum": field_schema[check], 

48 }, 

49 } 

50 ) 

51 if check == "pattern": 

52 if not re.match(field_schema[check], field): 

53 result.append( 

54 { 

55 "error_type": "pattern_not_matched", 

56 "value": field, 

57 "column": column, 

58 "row": row_number, 

59 "details": { 

60 "expected_pattern": field_schema[check], 

61 }, 

62 } 

63 ) 

64 if check == "min": 

65 try: 

66 if float(field) < field_schema[check]: 

67 result.append( 

68 { 

69 "error_type": "number_less_than_minimum", 

70 "value": field, 

71 "column": column, 

72 "row": row_number, 

73 "details": { 

74 "expected_min": field_schema[check], 

75 }, 

76 } 

77 ) 

78 except ValueError: 

79 result.append( 

80 { 

81 "error_type": "not_a_number", 

82 "value": field, 

83 "column": column, 

84 "row": row_number, 

85 "details": {}, 

86 } 

87 ) 

88 if check == "max": 

89 try: 

90 if float(field) > field_schema[check]: 

91 result.append( 

92 { 

93 "error_type": "max_exceeded", 

94 "value": field, 

95 "column": column, 

96 "row": row_number, 

97 "details": { 

98 "expected_max": field_schema[check], 

99 }, 

100 } 

101 ) 

102 except ValueError: 

103 result.append( 

104 { 

105 "error_type": "invalid_number_value", 

106 "value": field, 

107 "column": column, 

108 "row": row_number, 

109 "details": {}, 

110 } 

111 ) 

112 

113 return {"is_valid": len(result) == 0, "errors": result} 

114 

115 @staticmethod 

116 def is_type_valid( 

117 field: str, field_type: str, row_number: int, column: str 

118 ) -> bool: 

119 supported_types = ["string", "number", "boolean", "integer"] 

120 

121 if field_type not in supported_types: 

122 return { 

123 "error_type": "unsupported_field_type", 

124 "value": field_type, 

125 "column": column, 

126 "row": -1, 

127 "details": { 

128 "supported_types": supported_types, 

129 }, 

130 } 

131 

132 if field_type == "string": 

133 return None 

134 

135 if field_type == "number": 

136 try: 

137 float(field) 

138 return None 

139 except ValueError: 

140 return { 

141 "error_type": "invalid_number", 

142 "value": field, 

143 "column": column, 

144 "row": row_number, 

145 "details": {}, 

146 } 

147 

148 if field_type == "integer": 

149 try: 

150 int(field) 

151 return None 

152 except ValueError: 

153 return { 

154 "error_type": "invalid_integer", 

155 "value": field, 

156 "column": column, 

157 "row": row_number, 

158 "details": {}, 

159 } 

160 

161 if field_type == "boolean": 

162 if field.lower() in ["true", "false"]: 

163 return None 

164 else: 

165 return { 

166 "error_type": "invalid_boolean", 

167 "value": field, 

168 "column": column, 

169 "row": row_number, 

170 "details": { 

171 "supported_values": ["true", "false"], 

172 }, 

173 } 

174 

175 return None 

176 

177 @staticmethod 

178 def dict_array_to_dict(array: list[dict], by_key: str) -> dict: 

179 return {item[by_key]: item for item in array} 

180 

181 @staticmethod 

182 def validate_required_fields(header: list[str], required_fields: list[str]) -> bool: 

183 if not set(required_fields).issubset(set(header)): 

184 return { 

185 "is_valid": False, 

186 "errors": [ 

187 { 

188 "error_type": "missing_fields", 

189 "value": header, 

190 "row": -1, 

191 "column": -1, 

192 "details": { 

193 "required_fields": required_fields, 

194 "missing_fields": list(set(required_fields) - set(header)), 

195 }, 

196 } 

197 ], 

198 } 

199 else: 

200 return {"is_valid": True, "errors": []}