Coverage for agentos/tools/data_validator.py: 98%

95 statements  

« prev     ^ index     » next       coverage.py v7.14.3, created at 2026-07-03 08:37 +0800

1""" 

2DataValidator — schema-based data validation with custom rules. 

3 

4Supports: 

5 - Type validation (str, int, float, bool, list, dict) 

6 - Required/optional fields 

7 - Nullable fields 

8 - Min/max for numbers and strings 

9 - Enum (allowed values) 

10 - Regex pattern matching 

11 - Nested object validation 

12 - List item validation 

13 - Custom validator functions 

14 - Human-readable error messages 

15""" 

16 

17from __future__ import annotations 

18 

19import re 

20from typing import Any, Callable, Dict, List, Optional, Tuple, Union 

21 

22 

23# ============================================================================ 

24# Schema definition 

25# ============================================================================ 

26 

27class Field: 

28 """A single field definition within a schema.""" 

29 

30 def __init__( 

31 self, 

32 field_type: type, 

33 required: bool = True, 

34 nullable: bool = False, 

35 min_value: Optional[Union[int, float]] = None, 

36 max_value: Optional[Union[int, float]] = None, 

37 min_length: Optional[int] = None, 

38 max_length: Optional[int] = None, 

39 enum: Optional[List[Any]] = None, 

40 pattern: Optional[str] = None, 

41 custom: Optional[Callable[[Any], Optional[str]]] = None, 

42 # Nesting 

43 nested: Optional[Dict[str, "Field"]] = None, 

44 items: Optional["Field"] = None, 

45 ): 

46 self.field_type = field_type 

47 self.required = required 

48 self.nullable = nullable 

49 self.min_value = min_value 

50 self.max_value = max_value 

51 self.min_length = min_length 

52 self.max_length = max_length 

53 self.enum = enum 

54 self.pattern = re.compile(pattern) if pattern else None 

55 self.custom = custom 

56 self.nested = nested 

57 self.items = items 

58 

59 

60# ============================================================================ 

61# Validator 

62# ============================================================================ 

63 

64class ValidationError(Exception): 

65 """Raised when validation fails. Carries a list of error messages.""" 

66 

67 def __init__(self, errors: List[str]): 

68 self.errors = errors 

69 super().__init__("\n".join(errors)) 

70 

71 

72class DataValidator: 

73 """Schema-based data validator. 

74 

75 Usage: 

76 schema = { 

77 "name": Field(str, min_length=1, max_length=100), 

78 "age": Field(int, min_value=0, max_value=150), 

79 "email": Field(str, pattern=r"^[^@]+@[^@]+\.[^@]+$"), 

80 "tags": Field(list, items=Field(str)), 

81 } 

82 

83 validator = DataValidator(schema) 

84 result = validator.validate(data) 

85 if result: 

86 ... # use result 

87 """ 

88 

89 def __init__(self, schema: Dict[str, Field]): 

90 self._schema = schema 

91 

92 def validate(self, data: dict) -> dict: 

93 """Validate data against schema. Returns cleaned data or raises ValidationError.""" 

94 errors = [] 

95 cleaned = self._validate_dict(data, self._schema, "", errors) 

96 if errors: 

97 raise ValidationError(errors) 

98 return cleaned 

99 

100 def is_valid(self, data: dict) -> bool: 

101 """Check if data is valid without raising.""" 

102 try: 

103 self.validate(data) 

104 return True 

105 except ValidationError: 

106 return False 

107 

108 def errors(self, data: dict) -> List[str]: 

109 """Return list of validation error messages.""" 

110 errors_list: List[str] = [] 

111 self._validate_dict(data, self._schema, "", errors_list) 

112 return errors_list 

113 

114 # ---------- Internal ---------- 

115 

116 def _validate_dict(self, data: dict, schema: Dict[str, Field], path: str, errors: List[str]) -> dict: 

117 if not isinstance(data, dict): 

118 errors.append(f"{path or '(root)'}: expected dict, got {type(data).__name__}") 

119 return {} 

120 

121 result = {} 

122 

123 # Check required fields 

124 for name, field in schema.items(): 

125 fpath = f"{path}.{name}" if path else name 

126 if name not in data: 

127 if field.required: 

128 errors.append(f"{fpath}: required field missing") 

129 continue 

130 

131 value = data[name] 

132 validated = self._validate_value(value, field, fpath, errors) 

133 if validated is not None or field.nullable: 

134 result[name] = validated 

135 

136 # Warn about unknown fields (can be made strict later) 

137 return result 

138 

139 def _validate_value(self, value: Any, field: Field, path: str, errors: List[str]) -> Any: 

140 # Nullable check 

141 if value is None: 

142 if not field.nullable: 

143 errors.append(f"{path}: value is None but field is not nullable") 

144 return None 

145 return None 

146 

147 # Type check 

148 if not isinstance(value, field.field_type): 

149 errors.append(f"{path}: expected {field.field_type.__name__}, got {type(value).__name__}") 

150 return None 

151 

152 # Min/max for numbers 

153 if field.field_type in (int, float): 

154 if field.min_value is not None and value < field.min_value: 

155 errors.append(f"{path}: value {value} < min {field.min_value}") 

156 if field.max_value is not None and value > field.max_value: 

157 errors.append(f"{path}: value {value} > max {field.max_value}") 

158 

159 # Length for strings 

160 if field.field_type is str: 

161 if field.min_length is not None and len(value) < field.min_length: 

162 errors.append(f"{path}: length {len(value)} < min {field.min_length}") 

163 if field.max_length is not None and len(value) > field.max_length: 

164 errors.append(f"{path}: length {len(value)} > max {field.max_length}") 

165 

166 # Enum 

167 if field.enum is not None and value not in field.enum: 

168 errors.append(f"{path}: {value!r} not in {field.enum}") 

169 

170 # Pattern (regex) 

171 if field.pattern and not field.pattern.search(str(value)): 

172 errors.append(f"{path}: {value!r} does not match pattern") 

173 

174 # Nested object 

175 if field.nested and isinstance(value, dict): 

176 value = self._validate_dict(value, field.nested, path, errors) 

177 

178 # List items 

179 if field.items and isinstance(value, list): 

180 value = self._validate_list(value, field.items, path, errors) 

181 

182 # Custom validator 

183 if field.custom: 

184 msg = field.custom(value) 

185 if msg: 

186 errors.append(f"{path}: {msg}") 

187 

188 return value 

189 

190 def _validate_list(self, data: list, item_field: Field, path: str, errors: List[str]) -> list: 

191 result = [] 

192 for i, item in enumerate(data): 

193 item_path = f"{path}[{i}]" 

194 validated = self._validate_value(item, item_field, item_path, errors) 

195 result.append(validated) 

196 return result