Coverage for src/dataknobs_fsm/functions/library/validators.py: 0%

225 statements  

« prev     ^ index     » next       coverage.py v7.11.0, created at 2025-11-08 14:11 -0700

1"""Built-in validator functions for FSM. 

2 

3This module provides commonly used validation functions that can be 

4referenced in FSM configurations. 

5""" 

6 

7import re 

8from typing import Any, Dict, List, Union 

9 

10from pydantic import BaseModel, ValidationError 

11 

12from dataknobs_fsm.functions.base import IValidationFunction, ValidationError as FSMValidationError 

13 

14 

15class RequiredFieldsValidator(IValidationFunction): 

16 """Validate that required fields are present in data.""" 

17 

18 def __init__(self, fields: List[str], allow_none: bool = False): 

19 """Initialize the validator. 

20  

21 Args: 

22 fields: List of required field names. 

23 allow_none: Whether to allow None values for required fields. 

24 """ 

25 self.fields = fields 

26 self.allow_none = allow_none 

27 

28 def validate(self, data: Dict[str, Any]) -> bool: 

29 """Validate that all required fields are present. 

30  

31 Args: 

32 data: Data to validate. 

33  

34 Returns: 

35 True if valid, False otherwise. 

36  

37 Raises: 

38 FSMValidationError: If validation fails with details. 

39 """ 

40 if not isinstance(data, dict): 

41 raise FSMValidationError(f"Expected dict, got {type(data).__name__}") 

42 

43 missing_fields = [] 

44 none_fields = [] 

45 

46 for field in self.fields: 

47 if field not in data: 

48 missing_fields.append(field) 

49 elif not self.allow_none and data[field] is None: 

50 none_fields.append(field) 

51 

52 if missing_fields: 

53 raise FSMValidationError( 

54 f"Missing required fields: {', '.join(missing_fields)}" 

55 ) 

56 

57 if none_fields: 

58 raise FSMValidationError( 

59 f"Fields cannot be None: {', '.join(none_fields)}" 

60 ) 

61 

62 return True 

63 

64 def get_validation_rules(self) -> Dict[str, Any]: 

65 """Get the validation rules.""" 

66 return { 

67 "required_fields": self.fields, 

68 "allow_none": self.allow_none 

69 } 

70 

71 

72class SchemaValidator(IValidationFunction): 

73 """Validate data against a Pydantic schema.""" 

74 

75 def __init__(self, schema: Union[type[BaseModel], Dict[str, Any]]): 

76 """Initialize the validator. 

77  

78 Args: 

79 schema: Pydantic model class or schema dictionary. 

80 """ 

81 if isinstance(schema, dict): 

82 # Create a dynamic Pydantic model from dictionary 

83 from pydantic import create_model 

84 self.schema = create_model("DynamicSchema", **schema) 

85 else: 

86 self.schema = schema 

87 

88 def validate(self, data: Dict[str, Any]) -> bool: 

89 """Validate data against the schema. 

90  

91 Args: 

92 data: Data to validate. 

93  

94 Returns: 

95 True if valid, False otherwise. 

96  

97 Raises: 

98 FSMValidationError: If validation fails with details. 

99 """ 

100 try: 

101 self.schema(**data) 

102 return True 

103 except ValidationError as e: 

104 errors = [] 

105 for error in e.errors(): 

106 field_path = ".".join(str(loc) for loc in error["loc"]) 

107 errors.append(f"{field_path}: {error['msg']}") 

108 

109 raise FSMValidationError( 

110 f"Schema validation failed: {'; '.join(errors)}" 

111 ) from e 

112 

113 def get_validation_rules(self) -> Dict[str, Any]: 

114 """Get the validation rules.""" 

115 if hasattr(self.schema, 'model_json_schema'): 

116 return self.schema.model_json_schema() 

117 elif hasattr(self.schema, '__annotations__'): 

118 return dict(self.schema.__annotations__) 

119 else: 

120 return {"schema": str(self.schema)} 

121 

122 

123class RangeValidator(IValidationFunction): 

124 """Validate that numeric values are within specified ranges.""" 

125 

126 def __init__( 

127 self, 

128 field_ranges: Dict[str, Dict[str, Union[int, float]]], 

129 ): 

130 """Initialize the validator. 

131  

132 Args: 

133 field_ranges: Dictionary mapping field names to range specifications. 

134 Each range can have 'min', 'max', or both. 

135 """ 

136 self.field_ranges = field_ranges 

137 

138 def validate(self, data: Dict[str, Any]) -> bool: 

139 """Validate that values are within specified ranges. 

140  

141 Args: 

142 data: Data to validate. 

143  

144 Returns: 

145 True if valid, False otherwise. 

146  

147 Raises: 

148 FSMValidationError: If validation fails with details. 

149 """ 

150 errors = [] 

151 

152 for field, range_spec in self.field_ranges.items(): 

153 if field not in data: 

154 continue 

155 

156 value = data[field] 

157 if not isinstance(value, (int, float)): 

158 errors.append(f"{field}: Expected numeric value, got {type(value).__name__}") 

159 continue 

160 

161 if "min" in range_spec and value < range_spec["min"]: 

162 errors.append(f"{field}: Value {value} is below minimum {range_spec['min']}") 

163 

164 if "max" in range_spec and value > range_spec["max"]: 

165 errors.append(f"{field}: Value {value} is above maximum {range_spec['max']}") 

166 

167 if errors: 

168 raise FSMValidationError("; ".join(errors)) 

169 

170 return True 

171 

172 def get_validation_rules(self) -> Dict[str, Any]: 

173 """Get the validation rules.""" 

174 return { 

175 "type": "range", 

176 "field_ranges": self.field_ranges 

177 } 

178 

179 

180class PatternValidator(IValidationFunction): 

181 """Validate that string values match specified patterns.""" 

182 

183 def __init__( 

184 self, 

185 field_patterns: Dict[str, str], 

186 flags: int = 0, 

187 ): 

188 """Initialize the validator. 

189  

190 Args: 

191 field_patterns: Dictionary mapping field names to regex patterns. 

192 flags: Regex flags to apply (e.g., re.IGNORECASE). 

193 """ 

194 self.field_patterns = {} 

195 for field, pattern in field_patterns.items(): 

196 self.field_patterns[field] = re.compile(pattern, flags) 

197 

198 def validate(self, data: Dict[str, Any]) -> bool: 

199 """Validate that values match specified patterns. 

200  

201 Args: 

202 data: Data to validate. 

203  

204 Returns: 

205 True if valid, False otherwise. 

206  

207 Raises: 

208 FSMValidationError: If validation fails with details. 

209 """ 

210 errors = [] 

211 

212 for field, pattern in self.field_patterns.items(): 

213 if field not in data: 

214 continue 

215 

216 value = data[field] 

217 if not isinstance(value, str): 

218 errors.append(f"{field}: Expected string value, got {type(value).__name__}") 

219 continue 

220 

221 if not pattern.match(value): 

222 errors.append(f"{field}: Value '{value}' does not match pattern") 

223 

224 if errors: 

225 raise FSMValidationError("; ".join(errors)) 

226 

227 return True 

228 

229 def get_validation_rules(self) -> Dict[str, Any]: 

230 """Get the validation rules.""" 

231 return { 

232 "type": "pattern", 

233 "field_patterns": {field: pattern.pattern for field, pattern in self.field_patterns.items()} 

234 } 

235 

236 

237class TypeValidator(IValidationFunction): 

238 """Validate that fields have expected types.""" 

239 

240 def __init__( 

241 self, 

242 field_types: Dict[str, Union[type, List[type]]], 

243 strict: bool = False, 

244 ): 

245 """Initialize the validator. 

246  

247 Args: 

248 field_types: Dictionary mapping field names to expected types. 

249 strict: If True, reject extra fields not in field_types. 

250 """ 

251 self.field_types = field_types 

252 self.strict = strict 

253 

254 def validate(self, data: Dict[str, Any]) -> bool: 

255 """Validate that fields have expected types. 

256  

257 Args: 

258 data: Data to validate. 

259  

260 Returns: 

261 True if valid, False otherwise. 

262  

263 Raises: 

264 FSMValidationError: If validation fails with details. 

265 """ 

266 errors = [] 

267 

268 # Check field types 

269 for field, expected_type in self.field_types.items(): 

270 if field not in data: 

271 continue 

272 

273 value = data[field] 

274 if isinstance(expected_type, list): 

275 # Multiple allowed types 

276 if not any(isinstance(value, t) for t in expected_type): 

277 type_names = ", ".join(t.__name__ for t in expected_type) 

278 errors.append( 

279 f"{field}: Expected one of [{type_names}], " 

280 f"got {type(value).__name__}" 

281 ) 

282 else: 

283 # Single expected type 

284 if not isinstance(value, expected_type): 

285 errors.append( 

286 f"{field}: Expected {expected_type.__name__}, " 

287 f"got {type(value).__name__}" 

288 ) 

289 

290 # Check for extra fields if strict mode 

291 if self.strict: 

292 extra_fields = set(data.keys()) - set(self.field_types.keys()) 

293 if extra_fields: 

294 errors.append(f"Unexpected fields: {', '.join(extra_fields)}") 

295 

296 if errors: 

297 raise FSMValidationError("; ".join(errors)) 

298 

299 return True 

300 

301 def get_validation_rules(self) -> Dict[str, Any]: 

302 """Get the validation rules.""" 

303 field_type_names = {} 

304 for field, ftype in self.field_types.items(): 

305 if isinstance(ftype, list): 

306 field_type_names[field] = [t.__name__ for t in ftype] 

307 else: 

308 field_type_names[field] = ftype.__name__ 

309 return { 

310 "type": "type_check", 

311 "field_types": field_type_names, 

312 "strict": self.strict 

313 } 

314 

315 

316class LengthValidator(IValidationFunction): 

317 """Validate that collections have expected lengths.""" 

318 

319 def __init__( 

320 self, 

321 field_lengths: Dict[str, Dict[str, int]], 

322 ): 

323 """Initialize the validator. 

324  

325 Args: 

326 field_lengths: Dictionary mapping field names to length specifications. 

327 Each spec can have 'min', 'max', or 'exact'. 

328 """ 

329 self.field_lengths = field_lengths 

330 

331 def validate(self, data: Dict[str, Any]) -> bool: 

332 """Validate that collections have expected lengths. 

333  

334 Args: 

335 data: Data to validate. 

336  

337 Returns: 

338 True if valid, False otherwise. 

339  

340 Raises: 

341 FSMValidationError: If validation fails with details. 

342 """ 

343 errors = [] 

344 

345 for field, length_spec in self.field_lengths.items(): 

346 if field not in data: 

347 continue 

348 

349 value = data[field] 

350 if not hasattr(value, "__len__"): 

351 errors.append(f"{field}: Value does not have a length") 

352 continue 

353 

354 length = len(value) 

355 

356 if "exact" in length_spec and length != length_spec["exact"]: 

357 errors.append( 

358 f"{field}: Length {length} does not match expected {length_spec['exact']}" 

359 ) 

360 

361 if "min" in length_spec and length < length_spec["min"]: 

362 errors.append(f"{field}: Length {length} is below minimum {length_spec['min']}") 

363 

364 if "max" in length_spec and length > length_spec["max"]: 

365 errors.append(f"{field}: Length {length} is above maximum {length_spec['max']}") 

366 

367 if errors: 

368 raise FSMValidationError("; ".join(errors)) 

369 

370 return True 

371 

372 def get_validation_rules(self) -> Dict[str, Any]: 

373 """Get the validation rules.""" 

374 return { 

375 "type": "length", 

376 "field_lengths": self.field_lengths 

377 } 

378 

379 

380class UniqueValidator(IValidationFunction): 

381 """Validate that values in collections are unique.""" 

382 

383 def __init__( 

384 self, 

385 fields: List[str], 

386 key: str | None = None, 

387 ): 

388 """Initialize the validator. 

389  

390 Args: 

391 fields: List of field names to check for uniqueness. 

392 key: Optional key to extract from collection items for uniqueness check. 

393 """ 

394 self.fields = fields 

395 self.key = key 

396 

397 def validate(self, data: Dict[str, Any]) -> bool: 

398 """Validate that values are unique. 

399  

400 Args: 

401 data: Data to validate. 

402  

403 Returns: 

404 True if valid, False otherwise. 

405  

406 Raises: 

407 FSMValidationError: If validation fails with details. 

408 """ 

409 errors = [] 

410 

411 for field in self.fields: 

412 if field not in data: 

413 continue 

414 

415 value = data[field] 

416 if not isinstance(value, (list, tuple, set)): 

417 errors.append(f"{field}: Expected collection, got {type(value).__name__}") 

418 continue 

419 

420 if self.key: 

421 # Extract values using key 

422 try: 

423 values = [item[self.key] if isinstance(item, dict) else getattr(item, self.key) 

424 for item in value] 

425 except (KeyError, AttributeError) as e: 

426 errors.append(f"{field}: Cannot extract key '{self.key}': {e}") 

427 continue 

428 else: 

429 values = list(value) 

430 

431 # Check for duplicates 

432 seen = set() 

433 duplicates = set() 

434 for v in values: 

435 if v in seen: 

436 duplicates.add(str(v)) 

437 seen.add(v) 

438 

439 if duplicates: 

440 errors.append(f"{field}: Duplicate values found: {', '.join(duplicates)}") 

441 

442 if errors: 

443 raise FSMValidationError("; ".join(errors)) 

444 

445 return True 

446 

447 def get_validation_rules(self) -> Dict[str, Any]: 

448 """Get the validation rules.""" 

449 return { 

450 "type": "unique", 

451 "fields": self.fields, 

452 "key": self.key 

453 } 

454 

455 

456class DependencyValidator(IValidationFunction): 

457 """Validate field dependencies (if field A exists, field B must also exist).""" 

458 

459 def __init__( 

460 self, 

461 dependencies: Dict[str, Union[str, List[str]]], 

462 ): 

463 """Initialize the validator. 

464  

465 Args: 

466 dependencies: Dictionary mapping field names to their dependencies. 

467 """ 

468 self.dependencies = dependencies 

469 

470 def validate(self, data: Dict[str, Any]) -> bool: 

471 """Validate field dependencies. 

472  

473 Args: 

474 data: Data to validate. 

475  

476 Returns: 

477 True if valid, False otherwise. 

478  

479 Raises: 

480 FSMValidationError: If validation fails with details. 

481 """ 

482 errors = [] 

483 

484 for field, deps in self.dependencies.items(): 

485 if field not in data: 

486 continue 

487 

488 deps_list = deps if isinstance(deps, list) else [deps] 

489 

490 missing_deps = [dep for dep in deps_list if dep not in data] 

491 

492 if missing_deps: 

493 errors.append( 

494 f"Field '{field}' requires: {', '.join(missing_deps)}" 

495 ) 

496 

497 if errors: 

498 raise FSMValidationError("; ".join(errors)) 

499 

500 return True 

501 

502 def get_validation_rules(self) -> Dict[str, Any]: 

503 """Get the validation rules.""" 

504 return { 

505 "type": "dependency", 

506 "dependencies": self.dependencies 

507 } 

508 

509 

510class CompositeValidator(IValidationFunction): 

511 """Compose multiple validators into a single validator.""" 

512 

513 def __init__( 

514 self, 

515 validators: List[IValidationFunction], 

516 stop_on_first_error: bool = False, 

517 ): 

518 """Initialize the composite validator. 

519  

520 Args: 

521 validators: List of validators to apply. 

522 stop_on_first_error: If True, stop at first validation error. 

523 """ 

524 self.validators = validators 

525 self.stop_on_first_error = stop_on_first_error 

526 

527 def validate(self, data: Dict[str, Any]) -> bool: 

528 """Apply all validators to the data. 

529  

530 Args: 

531 data: Data to validate. 

532  

533 Returns: 

534 True if all validators pass. 

535  

536 Raises: 

537 FSMValidationError: If any validation fails. 

538 """ 

539 errors = [] 

540 

541 for validator in self.validators: 

542 try: 

543 validator.validate(data) 

544 except FSMValidationError as e: 

545 if self.stop_on_first_error: 

546 raise 

547 errors.append(str(e)) 

548 

549 if errors: 

550 raise FSMValidationError("; ".join(errors)) 

551 

552 return True 

553 

554 

555# Convenience functions for creating validators 

556def required_fields(*fields: str, allow_none: bool = False) -> RequiredFieldsValidator: 

557 """Create a RequiredFieldsValidator.""" 

558 return RequiredFieldsValidator(list(fields), allow_none) 

559 

560 

561def schema(model: Union[type[BaseModel], Dict[str, Any]]) -> SchemaValidator: 

562 """Create a SchemaValidator.""" 

563 return SchemaValidator(model) 

564 

565 

566def range_check(**field_ranges: Dict[str, Union[int, float]]) -> RangeValidator: 

567 """Create a RangeValidator.""" 

568 return RangeValidator(field_ranges) 

569 

570 

571def pattern(**field_patterns: str) -> PatternValidator: 

572 """Create a PatternValidator.""" 

573 return PatternValidator(field_patterns) 

574 

575 

576def type_check(**field_types: Union[type, List[type]]) -> TypeValidator: 

577 """Create a TypeValidator.""" 

578 return TypeValidator(field_types) 

579 

580 

581def length(**field_lengths: Dict[str, int]) -> LengthValidator: 

582 """Create a LengthValidator.""" 

583 return LengthValidator(field_lengths) 

584 

585 

586def unique(*fields: str, key: str | None = None) -> UniqueValidator: 

587 """Create a UniqueValidator.""" 

588 return UniqueValidator(list(fields), key) 

589 

590 

591def depends_on(**dependencies: Union[str, List[str]]) -> DependencyValidator: 

592 """Create a DependencyValidator.""" 

593 return DependencyValidator(dependencies)