Coverage for src / tracekit / config / schema.py: 96%

138 statements  

« prev     ^ index     » next       coverage.py v7.13.1, created at 2026-01-11 23:04 +0000

1"""JSON Schema validation system for TraceKit configuration. 

2 

3This module provides a flexible schema validation system using JSON Schema 

4for validating configuration files including protocols, pipelines, and 

5threshold configurations. 

6 

7 

8Example: 

9 >>> from tracekit.config.schema import validate_against_schema 

10 >>> config = {"name": "uart", "baud_rate": 115200} 

11 >>> validate_against_schema(config, "protocol") 

12 True 

13""" 

14 

15from __future__ import annotations 

16 

17from dataclasses import dataclass 

18from typing import Any 

19 

20# Try to import jsonschema for full validation 

21try: 

22 import jsonschema # noqa: F401 

23 from jsonschema import Draft7Validator 

24 from jsonschema import ValidationError as JsonSchemaError 

25 

26 JSONSCHEMA_AVAILABLE = True 

27except ImportError: 

28 JSONSCHEMA_AVAILABLE = False 

29 JsonSchemaError = Exception # type: ignore[ignore-without-code] 

30 

31from tracekit.core.exceptions import ConfigurationError 

32from tracekit.core.exceptions import ValidationError as TraceKitValidationError 

33 

34 

35class ValidationError(TraceKitValidationError): 

36 """Schema validation error with detailed location information. 

37 

38 Attributes: 

39 path: JSON path to the invalid field. 

40 line: Line number in source file (if available). 

41 column: Column number in source file (if available). 

42 schema_path: Path in schema where validation failed. 

43 """ 

44 

45 def __init__( 

46 self, 

47 message: str, 

48 *, 

49 path: str | None = None, 

50 line: int | None = None, 

51 column: int | None = None, 

52 schema_path: str | None = None, 

53 expected: Any = None, 

54 actual: Any = None, 

55 suggestion: str | None = None, 

56 ) -> None: 

57 """Initialize ValidationError. 

58 

59 Args: 

60 message: Description of the validation failure. 

61 path: JSON path to invalid field (e.g., "protocol.timing.baud_rate"). 

62 line: Line number in source file. 

63 column: Column number in source file. 

64 schema_path: Path in schema where validation failed. 

65 expected: Expected value or type. 

66 actual: Actual value found. 

67 suggestion: Suggested fix. 

68 """ 

69 self.path = path 

70 self.line = line 

71 self.column = column 

72 self.schema_path = schema_path 

73 self.expected = expected 

74 self.actual = actual 

75 self.suggestion = suggestion 

76 

77 # Build detailed message 

78 details_parts = [] 

79 if path: 

80 details_parts.append(f"Path: {path}") 

81 if line is not None: 

82 location = f"Line {line}" 

83 if column is not None: 83 ↛ 85line 83 didn't jump to line 85 because the condition on line 83 was always true

84 location += f", column {column}" 

85 details_parts.append(location) 

86 if expected is not None: 

87 details_parts.append(f"Expected: {expected}") 

88 if actual is not None: 

89 details_parts.append(f"Got: {actual}") 

90 

91 super().__init__( 

92 message, 

93 field=path, 

94 constraint=schema_path, 

95 value=actual, 

96 ) 

97 

98 

99@dataclass 

100class ConfigSchema: 

101 """Schema definition with metadata. 

102 

103 Attributes: 

104 name: Schema identifier (e.g., "protocol", "pipeline"). 

105 version: Schema version (semver format). 

106 schema: JSON Schema dictionary. 

107 description: Human-readable description. 

108 uri: Optional URI for schema reference. 

109 """ 

110 

111 name: str 

112 version: str 

113 schema: dict[str, Any] 

114 description: str = "" 

115 uri: str | None = None 

116 

117 def __post_init__(self) -> None: 

118 """Validate schema after initialization.""" 

119 if not self.name: 

120 raise ValueError("Schema name cannot be empty") 

121 if not self.version: 

122 raise ValueError("Schema version cannot be empty") 

123 if not self.schema: 

124 raise ValueError("Schema cannot be empty") 

125 

126 @property 

127 def full_uri(self) -> str: 

128 """Get full schema URI. 

129 

130 Returns: 

131 URI for schema reference, or generated local path if not provided. 

132 """ 

133 if self.uri: 

134 return self.uri 

135 return f"urn:tracekit:schemas:{self.name}:v{self.version}" 

136 

137 

138class SchemaRegistry: 

139 """Central registry for all configuration schemas. 

140 

141 Provides O(1) lookup of schemas by name and version. 

142 

143 Example: 

144 >>> registry = SchemaRegistry() 

145 >>> registry.register(protocol_schema) 

146 >>> schema = registry.get("protocol") 

147 """ 

148 

149 def __init__(self) -> None: 

150 """Initialize empty schema registry.""" 

151 self._schemas: dict[str, dict[str, ConfigSchema]] = {} 

152 self._default_versions: dict[str, str] = {} 

153 

154 def register( 

155 self, 

156 schema: ConfigSchema, 

157 *, 

158 set_default: bool = True, 

159 ) -> None: 

160 """Register a schema with the registry. 

161 

162 Args: 

163 schema: Schema to register. 

164 set_default: If True, set as default version for this schema name. 

165 

166 Raises: 

167 ValueError: If schema with same name and version already exists. 

168 """ 

169 if schema.name not in self._schemas: 

170 self._schemas[schema.name] = {} 

171 

172 if schema.version in self._schemas[schema.name]: 

173 self._schemas[schema.name][schema.version] 

174 raise ValueError(f"Schema '{schema.name}' v{schema.version} already registered") 

175 

176 self._schemas[schema.name][schema.version] = schema 

177 

178 if set_default: 

179 self._default_versions[schema.name] = schema.version 

180 

181 def get( 

182 self, 

183 name: str, 

184 version: str | None = None, 

185 ) -> ConfigSchema | None: 

186 """Get schema by name and optional version. 

187 

188 Args: 

189 name: Schema name (e.g., "protocol"). 

190 version: Specific version or None for default. 

191 

192 Returns: 

193 ConfigSchema if found, None otherwise. 

194 """ 

195 if name not in self._schemas: 

196 return None 

197 

198 if version is None: 

199 version = self._default_versions.get(name) 

200 if version is None: 200 ↛ 201line 200 didn't jump to line 201 because the condition on line 200 was never true

201 return None 

202 

203 return self._schemas[name].get(version) 

204 

205 def list_schemas(self) -> list[str]: 

206 """List all registered schema names. 

207 

208 Returns: 

209 List of schema names. 

210 """ 

211 return list(self._schemas.keys()) 

212 

213 def list_versions(self, name: str) -> list[str]: 

214 """List all versions of a schema. 

215 

216 Args: 

217 name: Schema name. 

218 

219 Returns: 

220 List of version strings. 

221 """ 

222 if name not in self._schemas: 

223 return [] 

224 return list(self._schemas[name].keys()) 

225 

226 def has_schema(self, name: str, version: str | None = None) -> bool: 

227 """Check if schema exists. 

228 

229 Args: 

230 name: Schema name. 

231 version: Specific version or None for any. 

232 

233 Returns: 

234 True if schema exists. 

235 """ 

236 if name not in self._schemas: 

237 return False 

238 if version is None: 

239 return True 

240 return version in self._schemas[name] 

241 

242 

243# Global schema registry 

244_global_registry: SchemaRegistry | None = None 

245 

246 

247def get_schema_registry() -> SchemaRegistry: 

248 """Get the global schema registry. 

249 

250 Initializes with built-in schemas on first call. 

251 

252 Returns: 

253 Global SchemaRegistry instance. 

254 """ 

255 global _global_registry 

256 

257 if _global_registry is None: 

258 _global_registry = SchemaRegistry() 

259 _register_builtin_schemas(_global_registry) 

260 

261 return _global_registry 

262 

263 

264def register_schema( 

265 schema: ConfigSchema, 

266 *, 

267 set_default: bool = True, 

268) -> None: 

269 """Register a schema with the global registry. 

270 

271 Args: 

272 schema: Schema to register. 

273 set_default: If True, set as default version. 

274 """ 

275 get_schema_registry().register(schema, set_default=set_default) 

276 

277 

278def validate_against_schema( 

279 config: dict[str, Any], 

280 schema_name: str, 

281 *, 

282 version: str | None = None, 

283 strict: bool = False, 

284) -> bool: 

285 """Validate configuration against a registered schema. 

286 

287 Args: 

288 config: Configuration dictionary to validate. 

289 schema_name: Name of schema to validate against. 

290 version: Specific schema version or None for default. 

291 strict: If True, fail on additional properties. 

292 

293 Returns: 

294 True if validation passes. 

295 

296 Raises: 

297 ValidationError: If validation fails with detailed error info. 

298 ConfigurationError: If schema not found or jsonschema not available. 

299 """ 

300 if not JSONSCHEMA_AVAILABLE: 300 ↛ 301line 300 didn't jump to line 301 because the condition on line 300 was never true

301 raise ConfigurationError( 

302 "JSON Schema validation not available", 

303 fix_hint="Install jsonschema: pip install jsonschema", 

304 ) 

305 

306 registry = get_schema_registry() 

307 schema_obj = registry.get(schema_name, version) 

308 

309 if schema_obj is None: 

310 available = registry.list_schemas() 

311 raise ConfigurationError( 

312 f"Schema '{schema_name}' not found", 

313 details=f"Available schemas: {available}", 

314 ) 

315 

316 schema = schema_obj.schema.copy() 

317 

318 # Add strict mode 

319 if strict and "additionalProperties" not in schema: 

320 schema["additionalProperties"] = False 

321 

322 try: 

323 validator = Draft7Validator(schema) 

324 errors = list(validator.iter_errors(config)) 

325 

326 if errors: 

327 # Get first error for main message 

328 error = errors[0] 

329 path = ".".join(str(p) for p in error.absolute_path) or "(root)" 

330 

331 # Try to provide helpful suggestion 

332 suggestion = _get_error_suggestion(error) 

333 

334 raise ValidationError( 

335 str(error.message), 

336 path=path, 

337 schema_path=".".join(str(p) for p in error.absolute_schema_path), 

338 expected=error.schema.get("type") or error.schema.get("enum"), 

339 actual=error.instance, 

340 suggestion=suggestion, 

341 ) 

342 

343 return True 

344 

345 except JsonSchemaError as e: 

346 path = ".".join(str(p) for p in e.absolute_path) if e.absolute_path else None # type: ignore[assignment] 

347 raise ValidationError( 

348 str(e.message), 

349 path=path, 

350 schema_path=".".join(str(p) for p in e.absolute_schema_path) 

351 if e.absolute_schema_path 

352 else None, 

353 ) from e 

354 

355 

356def _get_error_suggestion(error: Any) -> str | None: 

357 """Generate suggestion for common validation errors. 

358 

359 Args: 

360 error: jsonschema ValidationError. 

361 

362 Returns: 

363 Suggestion string or None. 

364 """ 

365 msg = error.message.lower() 

366 

367 if "is not of type" in msg: 

368 expected_type = error.schema.get("type", "unknown") 

369 return f"Convert value to {expected_type}" 

370 

371 if "is not valid under any of the given schemas" in msg: 

372 return "Check value matches one of the allowed formats" 

373 

374 if "is a required property" in msg: 

375 return "Add the missing required field" 

376 

377 if "additional properties" in msg: 

378 return "Remove unrecognized fields or use additionalProperties: true" 

379 

380 if "does not match" in msg: 

381 pattern = error.schema.get("pattern") 

382 if pattern: 382 ↛ 385line 382 didn't jump to line 385 because the condition on line 382 was always true

383 return f"Value must match pattern: {pattern}" 

384 

385 return None 

386 

387 

388def _register_builtin_schemas(registry: SchemaRegistry) -> None: 

389 """Register all built-in schemas. 

390 

391 Args: 

392 registry: Registry to populate. 

393 """ 

394 # Protocol definition schema 

395 registry.register( 

396 ConfigSchema( 

397 name="protocol", 

398 version="1.0.0", 

399 description="Protocol decoder configuration", 

400 schema={ 

401 "$schema": "http://json-schema.org/draft-07/schema#", 

402 "type": "object", 

403 "required": ["name"], 

404 "properties": { 

405 "name": { 

406 "type": "string", 

407 "description": "Protocol identifier", 

408 "pattern": "^[a-z][a-z0-9_]*$", 

409 }, 

410 "version": { 

411 "type": "string", 

412 "description": "Protocol version (semver)", 

413 "pattern": "^\\d+\\.\\d+\\.\\d+$", 

414 }, 

415 "description": { 

416 "type": "string", 

417 }, 

418 "author": { 

419 "type": "string", 

420 }, 

421 "timing": { 

422 "type": "object", 

423 "properties": { 

424 "baud_rates": { 

425 "type": "array", 

426 "items": {"type": "integer", "minimum": 1}, 

427 }, 

428 "data_bits": { 

429 "type": "array", 

430 "items": { 

431 "type": "integer", 

432 "minimum": 1, 

433 "maximum": 32, 

434 }, 

435 }, 

436 "stop_bits": { 

437 "type": "array", 

438 "items": { 

439 "type": "number", 

440 "minimum": 0.5, 

441 "maximum": 2, 

442 }, 

443 }, 

444 "parity": { 

445 "type": "array", 

446 "items": { 

447 "type": "string", 

448 "enum": ["none", "even", "odd", "mark", "space"], 

449 }, 

450 }, 

451 }, 

452 }, 

453 "voltage_levels": { 

454 "type": "object", 

455 "properties": { 

456 "logic_family": {"type": "string"}, 

457 "idle_state": {"type": "string", "enum": ["high", "low"]}, 

458 "mark_voltage": {"type": "number"}, 

459 "space_voltage": {"type": "number"}, 

460 }, 

461 }, 

462 "state_machine": { 

463 "type": "object", 

464 "properties": { 

465 "states": { 

466 "type": "array", 

467 "items": {"type": "string"}, 

468 }, 

469 "initial_state": {"type": "string"}, 

470 "transitions": { 

471 "type": "array", 

472 "items": { 

473 "type": "object", 

474 "required": ["from", "to", "condition"], 

475 "properties": { 

476 "from": {"type": "string"}, 

477 "to": {"type": "string"}, 

478 "condition": {"type": "string"}, 

479 }, 

480 }, 

481 }, 

482 }, 

483 }, 

484 }, 

485 "additionalProperties": True, 

486 }, 

487 ) 

488 ) 

489 

490 # Pipeline definition schema 

491 registry.register( 

492 ConfigSchema( 

493 name="pipeline", 

494 version="1.0.0", 

495 description="Analysis pipeline configuration", 

496 schema={ 

497 "$schema": "http://json-schema.org/draft-07/schema#", 

498 "type": "object", 

499 "required": ["name", "steps"], 

500 "properties": { 

501 "name": { 

502 "type": "string", 

503 "description": "Pipeline identifier", 

504 }, 

505 "version": { 

506 "type": "string", 

507 "pattern": "^\\d+\\.\\d+\\.\\d+$", 

508 }, 

509 "description": { 

510 "type": "string", 

511 }, 

512 "steps": { 

513 "type": "array", 

514 "minItems": 1, 

515 "items": { 

516 "type": "object", 

517 "required": ["name", "type"], 

518 "properties": { 

519 "name": {"type": "string"}, 

520 "type": {"type": "string"}, 

521 "params": {"type": "object"}, 

522 "inputs": {"type": "object"}, 

523 "outputs": {"type": "object"}, 

524 }, 

525 }, 

526 }, 

527 "parallel_groups": { 

528 "type": "array", 

529 "items": { 

530 "type": "array", 

531 "items": {"type": "string"}, 

532 }, 

533 }, 

534 }, 

535 }, 

536 ) 

537 ) 

538 

539 # Logic family schema 

540 registry.register( 

541 ConfigSchema( 

542 name="logic_family", 

543 version="1.0.0", 

544 description="Logic family voltage thresholds", 

545 schema={ 

546 "$schema": "http://json-schema.org/draft-07/schema#", 

547 "type": "object", 

548 "required": ["name", "VIH", "VIL", "VOH", "VOL"], 

549 "properties": { 

550 "name": { 

551 "type": "string", 

552 "description": "Logic family name", 

553 }, 

554 "description": { 

555 "type": "string", 

556 }, 

557 "VIH": { 

558 "type": "number", 

559 "description": "Input high voltage threshold (V)", 

560 "minimum": 0, 

561 "maximum": 10, 

562 }, 

563 "VIL": { 

564 "type": "number", 

565 "description": "Input low voltage threshold (V)", 

566 "minimum": 0, 

567 "maximum": 10, 

568 }, 

569 "VOH": { 

570 "type": "number", 

571 "description": "Output high voltage (V)", 

572 "minimum": 0, 

573 "maximum": 10, 

574 }, 

575 "VOL": { 

576 "type": "number", 

577 "description": "Output low voltage (V)", 

578 "minimum": 0, 

579 "maximum": 10, 

580 }, 

581 "VCC": { 

582 "type": "number", 

583 "description": "Supply voltage (V)", 

584 "minimum": 0, 

585 "maximum": 15, 

586 }, 

587 "temperature_range": { 

588 "type": "object", 

589 "properties": { 

590 "min": {"type": "number"}, 

591 "max": {"type": "number"}, 

592 }, 

593 }, 

594 "noise_margin_high": { 

595 "type": "number", 

596 "description": "High state noise margin (V)", 

597 }, 

598 "noise_margin_low": { 

599 "type": "number", 

600 "description": "Low state noise margin (V)", 

601 }, 

602 }, 

603 }, 

604 ) 

605 ) 

606 

607 # Threshold profile schema 

608 registry.register( 

609 ConfigSchema( 

610 name="threshold_profile", 

611 version="1.0.0", 

612 description="Analysis threshold profile", 

613 schema={ 

614 "$schema": "http://json-schema.org/draft-07/schema#", 

615 "type": "object", 

616 "required": ["name"], 

617 "properties": { 

618 "name": { 

619 "type": "string", 

620 }, 

621 "description": { 

622 "type": "string", 

623 }, 

624 "base_family": { 

625 "type": "string", 

626 "description": "Base logic family to extend", 

627 }, 

628 "overrides": { 

629 "type": "object", 

630 "additionalProperties": {"type": "number"}, 

631 }, 

632 "tolerance": { 

633 "type": "number", 

634 "description": "Tolerance percentage (0-100)", 

635 "minimum": 0, 

636 "maximum": 100, 

637 "default": 0, 

638 }, 

639 }, 

640 }, 

641 ) 

642 ) 

643 

644 # Preferences schema 

645 registry.register( 

646 ConfigSchema( 

647 name="preferences", 

648 version="1.0.0", 

649 description="User preferences", 

650 schema={ 

651 "$schema": "http://json-schema.org/draft-07/schema#", 

652 "type": "object", 

653 "properties": { 

654 "defaults": { 

655 "type": "object", 

656 "properties": { 

657 "sample_rate": {"type": "number", "minimum": 0}, 

658 "window_function": {"type": "string"}, 

659 "fft_size": {"type": "integer", "minimum": 1}, 

660 }, 

661 }, 

662 "visualization": { 

663 "type": "object", 

664 "properties": { 

665 "style": {"type": "string"}, 

666 "figure_size": { 

667 "type": "array", 

668 "items": {"type": "number"}, 

669 "minItems": 2, 

670 "maxItems": 2, 

671 }, 

672 "dpi": {"type": "integer", "minimum": 50, "maximum": 600}, 

673 "colormap": {"type": "string"}, 

674 }, 

675 }, 

676 "export": { 

677 "type": "object", 

678 "properties": { 

679 "default_format": { 

680 "type": "string", 

681 "enum": ["csv", "hdf5", "npz", "json"], 

682 }, 

683 "precision": { 

684 "type": "integer", 

685 "minimum": 1, 

686 "maximum": 15, 

687 }, 

688 }, 

689 }, 

690 "logging": { 

691 "type": "object", 

692 "properties": { 

693 "level": { 

694 "type": "string", 

695 "enum": ["DEBUG", "INFO", "WARNING", "ERROR"], 

696 }, 

697 "file": {"type": "string"}, 

698 }, 

699 }, 

700 }, 

701 }, 

702 ) 

703 ) 

704 

705 

706__all__ = [ 

707 "ConfigSchema", 

708 "SchemaRegistry", 

709 "ValidationError", 

710 "get_schema_registry", 

711 "register_schema", 

712 "validate_against_schema", 

713]