Coverage for src / tracekit / config / schema.py: 96%
138 statements
« prev ^ index » next coverage.py v7.13.1, created at 2026-01-11 23:04 +0000
« prev ^ index » next coverage.py v7.13.1, created at 2026-01-11 23:04 +0000
1"""JSON Schema validation system for TraceKit configuration.
3This module provides a flexible schema validation system using JSON Schema
4for validating configuration files including protocols, pipelines, and
5threshold configurations.
8Example:
9 >>> from tracekit.config.schema import validate_against_schema
10 >>> config = {"name": "uart", "baud_rate": 115200}
11 >>> validate_against_schema(config, "protocol")
12 True
13"""
15from __future__ import annotations
17from dataclasses import dataclass
18from typing import Any
20# Try to import jsonschema for full validation
21try:
22 import jsonschema # noqa: F401
23 from jsonschema import Draft7Validator
24 from jsonschema import ValidationError as JsonSchemaError
26 JSONSCHEMA_AVAILABLE = True
27except ImportError:
28 JSONSCHEMA_AVAILABLE = False
29 JsonSchemaError = Exception # type: ignore[ignore-without-code]
31from tracekit.core.exceptions import ConfigurationError
32from tracekit.core.exceptions import ValidationError as TraceKitValidationError
35class ValidationError(TraceKitValidationError):
36 """Schema validation error with detailed location information.
38 Attributes:
39 path: JSON path to the invalid field.
40 line: Line number in source file (if available).
41 column: Column number in source file (if available).
42 schema_path: Path in schema where validation failed.
43 """
45 def __init__(
46 self,
47 message: str,
48 *,
49 path: str | None = None,
50 line: int | None = None,
51 column: int | None = None,
52 schema_path: str | None = None,
53 expected: Any = None,
54 actual: Any = None,
55 suggestion: str | None = None,
56 ) -> None:
57 """Initialize ValidationError.
59 Args:
60 message: Description of the validation failure.
61 path: JSON path to invalid field (e.g., "protocol.timing.baud_rate").
62 line: Line number in source file.
63 column: Column number in source file.
64 schema_path: Path in schema where validation failed.
65 expected: Expected value or type.
66 actual: Actual value found.
67 suggestion: Suggested fix.
68 """
69 self.path = path
70 self.line = line
71 self.column = column
72 self.schema_path = schema_path
73 self.expected = expected
74 self.actual = actual
75 self.suggestion = suggestion
77 # Build detailed message
78 details_parts = []
79 if path:
80 details_parts.append(f"Path: {path}")
81 if line is not None:
82 location = f"Line {line}"
83 if column is not None: 83 ↛ 85line 83 didn't jump to line 85 because the condition on line 83 was always true
84 location += f", column {column}"
85 details_parts.append(location)
86 if expected is not None:
87 details_parts.append(f"Expected: {expected}")
88 if actual is not None:
89 details_parts.append(f"Got: {actual}")
91 super().__init__(
92 message,
93 field=path,
94 constraint=schema_path,
95 value=actual,
96 )
99@dataclass
100class ConfigSchema:
101 """Schema definition with metadata.
103 Attributes:
104 name: Schema identifier (e.g., "protocol", "pipeline").
105 version: Schema version (semver format).
106 schema: JSON Schema dictionary.
107 description: Human-readable description.
108 uri: Optional URI for schema reference.
109 """
111 name: str
112 version: str
113 schema: dict[str, Any]
114 description: str = ""
115 uri: str | None = None
117 def __post_init__(self) -> None:
118 """Validate schema after initialization."""
119 if not self.name:
120 raise ValueError("Schema name cannot be empty")
121 if not self.version:
122 raise ValueError("Schema version cannot be empty")
123 if not self.schema:
124 raise ValueError("Schema cannot be empty")
126 @property
127 def full_uri(self) -> str:
128 """Get full schema URI.
130 Returns:
131 URI for schema reference, or generated local path if not provided.
132 """
133 if self.uri:
134 return self.uri
135 return f"urn:tracekit:schemas:{self.name}:v{self.version}"
138class SchemaRegistry:
139 """Central registry for all configuration schemas.
141 Provides O(1) lookup of schemas by name and version.
143 Example:
144 >>> registry = SchemaRegistry()
145 >>> registry.register(protocol_schema)
146 >>> schema = registry.get("protocol")
147 """
149 def __init__(self) -> None:
150 """Initialize empty schema registry."""
151 self._schemas: dict[str, dict[str, ConfigSchema]] = {}
152 self._default_versions: dict[str, str] = {}
154 def register(
155 self,
156 schema: ConfigSchema,
157 *,
158 set_default: bool = True,
159 ) -> None:
160 """Register a schema with the registry.
162 Args:
163 schema: Schema to register.
164 set_default: If True, set as default version for this schema name.
166 Raises:
167 ValueError: If schema with same name and version already exists.
168 """
169 if schema.name not in self._schemas:
170 self._schemas[schema.name] = {}
172 if schema.version in self._schemas[schema.name]:
173 self._schemas[schema.name][schema.version]
174 raise ValueError(f"Schema '{schema.name}' v{schema.version} already registered")
176 self._schemas[schema.name][schema.version] = schema
178 if set_default:
179 self._default_versions[schema.name] = schema.version
181 def get(
182 self,
183 name: str,
184 version: str | None = None,
185 ) -> ConfigSchema | None:
186 """Get schema by name and optional version.
188 Args:
189 name: Schema name (e.g., "protocol").
190 version: Specific version or None for default.
192 Returns:
193 ConfigSchema if found, None otherwise.
194 """
195 if name not in self._schemas:
196 return None
198 if version is None:
199 version = self._default_versions.get(name)
200 if version is None: 200 ↛ 201line 200 didn't jump to line 201 because the condition on line 200 was never true
201 return None
203 return self._schemas[name].get(version)
205 def list_schemas(self) -> list[str]:
206 """List all registered schema names.
208 Returns:
209 List of schema names.
210 """
211 return list(self._schemas.keys())
213 def list_versions(self, name: str) -> list[str]:
214 """List all versions of a schema.
216 Args:
217 name: Schema name.
219 Returns:
220 List of version strings.
221 """
222 if name not in self._schemas:
223 return []
224 return list(self._schemas[name].keys())
226 def has_schema(self, name: str, version: str | None = None) -> bool:
227 """Check if schema exists.
229 Args:
230 name: Schema name.
231 version: Specific version or None for any.
233 Returns:
234 True if schema exists.
235 """
236 if name not in self._schemas:
237 return False
238 if version is None:
239 return True
240 return version in self._schemas[name]
243# Global schema registry
244_global_registry: SchemaRegistry | None = None
247def get_schema_registry() -> SchemaRegistry:
248 """Get the global schema registry.
250 Initializes with built-in schemas on first call.
252 Returns:
253 Global SchemaRegistry instance.
254 """
255 global _global_registry
257 if _global_registry is None:
258 _global_registry = SchemaRegistry()
259 _register_builtin_schemas(_global_registry)
261 return _global_registry
264def register_schema(
265 schema: ConfigSchema,
266 *,
267 set_default: bool = True,
268) -> None:
269 """Register a schema with the global registry.
271 Args:
272 schema: Schema to register.
273 set_default: If True, set as default version.
274 """
275 get_schema_registry().register(schema, set_default=set_default)
278def validate_against_schema(
279 config: dict[str, Any],
280 schema_name: str,
281 *,
282 version: str | None = None,
283 strict: bool = False,
284) -> bool:
285 """Validate configuration against a registered schema.
287 Args:
288 config: Configuration dictionary to validate.
289 schema_name: Name of schema to validate against.
290 version: Specific schema version or None for default.
291 strict: If True, fail on additional properties.
293 Returns:
294 True if validation passes.
296 Raises:
297 ValidationError: If validation fails with detailed error info.
298 ConfigurationError: If schema not found or jsonschema not available.
299 """
300 if not JSONSCHEMA_AVAILABLE: 300 ↛ 301line 300 didn't jump to line 301 because the condition on line 300 was never true
301 raise ConfigurationError(
302 "JSON Schema validation not available",
303 fix_hint="Install jsonschema: pip install jsonschema",
304 )
306 registry = get_schema_registry()
307 schema_obj = registry.get(schema_name, version)
309 if schema_obj is None:
310 available = registry.list_schemas()
311 raise ConfigurationError(
312 f"Schema '{schema_name}' not found",
313 details=f"Available schemas: {available}",
314 )
316 schema = schema_obj.schema.copy()
318 # Add strict mode
319 if strict and "additionalProperties" not in schema:
320 schema["additionalProperties"] = False
322 try:
323 validator = Draft7Validator(schema)
324 errors = list(validator.iter_errors(config))
326 if errors:
327 # Get first error for main message
328 error = errors[0]
329 path = ".".join(str(p) for p in error.absolute_path) or "(root)"
331 # Try to provide helpful suggestion
332 suggestion = _get_error_suggestion(error)
334 raise ValidationError(
335 str(error.message),
336 path=path,
337 schema_path=".".join(str(p) for p in error.absolute_schema_path),
338 expected=error.schema.get("type") or error.schema.get("enum"),
339 actual=error.instance,
340 suggestion=suggestion,
341 )
343 return True
345 except JsonSchemaError as e:
346 path = ".".join(str(p) for p in e.absolute_path) if e.absolute_path else None # type: ignore[assignment]
347 raise ValidationError(
348 str(e.message),
349 path=path,
350 schema_path=".".join(str(p) for p in e.absolute_schema_path)
351 if e.absolute_schema_path
352 else None,
353 ) from e
356def _get_error_suggestion(error: Any) -> str | None:
357 """Generate suggestion for common validation errors.
359 Args:
360 error: jsonschema ValidationError.
362 Returns:
363 Suggestion string or None.
364 """
365 msg = error.message.lower()
367 if "is not of type" in msg:
368 expected_type = error.schema.get("type", "unknown")
369 return f"Convert value to {expected_type}"
371 if "is not valid under any of the given schemas" in msg:
372 return "Check value matches one of the allowed formats"
374 if "is a required property" in msg:
375 return "Add the missing required field"
377 if "additional properties" in msg:
378 return "Remove unrecognized fields or use additionalProperties: true"
380 if "does not match" in msg:
381 pattern = error.schema.get("pattern")
382 if pattern: 382 ↛ 385line 382 didn't jump to line 385 because the condition on line 382 was always true
383 return f"Value must match pattern: {pattern}"
385 return None
388def _register_builtin_schemas(registry: SchemaRegistry) -> None:
389 """Register all built-in schemas.
391 Args:
392 registry: Registry to populate.
393 """
394 # Protocol definition schema
395 registry.register(
396 ConfigSchema(
397 name="protocol",
398 version="1.0.0",
399 description="Protocol decoder configuration",
400 schema={
401 "$schema": "http://json-schema.org/draft-07/schema#",
402 "type": "object",
403 "required": ["name"],
404 "properties": {
405 "name": {
406 "type": "string",
407 "description": "Protocol identifier",
408 "pattern": "^[a-z][a-z0-9_]*$",
409 },
410 "version": {
411 "type": "string",
412 "description": "Protocol version (semver)",
413 "pattern": "^\\d+\\.\\d+\\.\\d+$",
414 },
415 "description": {
416 "type": "string",
417 },
418 "author": {
419 "type": "string",
420 },
421 "timing": {
422 "type": "object",
423 "properties": {
424 "baud_rates": {
425 "type": "array",
426 "items": {"type": "integer", "minimum": 1},
427 },
428 "data_bits": {
429 "type": "array",
430 "items": {
431 "type": "integer",
432 "minimum": 1,
433 "maximum": 32,
434 },
435 },
436 "stop_bits": {
437 "type": "array",
438 "items": {
439 "type": "number",
440 "minimum": 0.5,
441 "maximum": 2,
442 },
443 },
444 "parity": {
445 "type": "array",
446 "items": {
447 "type": "string",
448 "enum": ["none", "even", "odd", "mark", "space"],
449 },
450 },
451 },
452 },
453 "voltage_levels": {
454 "type": "object",
455 "properties": {
456 "logic_family": {"type": "string"},
457 "idle_state": {"type": "string", "enum": ["high", "low"]},
458 "mark_voltage": {"type": "number"},
459 "space_voltage": {"type": "number"},
460 },
461 },
462 "state_machine": {
463 "type": "object",
464 "properties": {
465 "states": {
466 "type": "array",
467 "items": {"type": "string"},
468 },
469 "initial_state": {"type": "string"},
470 "transitions": {
471 "type": "array",
472 "items": {
473 "type": "object",
474 "required": ["from", "to", "condition"],
475 "properties": {
476 "from": {"type": "string"},
477 "to": {"type": "string"},
478 "condition": {"type": "string"},
479 },
480 },
481 },
482 },
483 },
484 },
485 "additionalProperties": True,
486 },
487 )
488 )
490 # Pipeline definition schema
491 registry.register(
492 ConfigSchema(
493 name="pipeline",
494 version="1.0.0",
495 description="Analysis pipeline configuration",
496 schema={
497 "$schema": "http://json-schema.org/draft-07/schema#",
498 "type": "object",
499 "required": ["name", "steps"],
500 "properties": {
501 "name": {
502 "type": "string",
503 "description": "Pipeline identifier",
504 },
505 "version": {
506 "type": "string",
507 "pattern": "^\\d+\\.\\d+\\.\\d+$",
508 },
509 "description": {
510 "type": "string",
511 },
512 "steps": {
513 "type": "array",
514 "minItems": 1,
515 "items": {
516 "type": "object",
517 "required": ["name", "type"],
518 "properties": {
519 "name": {"type": "string"},
520 "type": {"type": "string"},
521 "params": {"type": "object"},
522 "inputs": {"type": "object"},
523 "outputs": {"type": "object"},
524 },
525 },
526 },
527 "parallel_groups": {
528 "type": "array",
529 "items": {
530 "type": "array",
531 "items": {"type": "string"},
532 },
533 },
534 },
535 },
536 )
537 )
539 # Logic family schema
540 registry.register(
541 ConfigSchema(
542 name="logic_family",
543 version="1.0.0",
544 description="Logic family voltage thresholds",
545 schema={
546 "$schema": "http://json-schema.org/draft-07/schema#",
547 "type": "object",
548 "required": ["name", "VIH", "VIL", "VOH", "VOL"],
549 "properties": {
550 "name": {
551 "type": "string",
552 "description": "Logic family name",
553 },
554 "description": {
555 "type": "string",
556 },
557 "VIH": {
558 "type": "number",
559 "description": "Input high voltage threshold (V)",
560 "minimum": 0,
561 "maximum": 10,
562 },
563 "VIL": {
564 "type": "number",
565 "description": "Input low voltage threshold (V)",
566 "minimum": 0,
567 "maximum": 10,
568 },
569 "VOH": {
570 "type": "number",
571 "description": "Output high voltage (V)",
572 "minimum": 0,
573 "maximum": 10,
574 },
575 "VOL": {
576 "type": "number",
577 "description": "Output low voltage (V)",
578 "minimum": 0,
579 "maximum": 10,
580 },
581 "VCC": {
582 "type": "number",
583 "description": "Supply voltage (V)",
584 "minimum": 0,
585 "maximum": 15,
586 },
587 "temperature_range": {
588 "type": "object",
589 "properties": {
590 "min": {"type": "number"},
591 "max": {"type": "number"},
592 },
593 },
594 "noise_margin_high": {
595 "type": "number",
596 "description": "High state noise margin (V)",
597 },
598 "noise_margin_low": {
599 "type": "number",
600 "description": "Low state noise margin (V)",
601 },
602 },
603 },
604 )
605 )
607 # Threshold profile schema
608 registry.register(
609 ConfigSchema(
610 name="threshold_profile",
611 version="1.0.0",
612 description="Analysis threshold profile",
613 schema={
614 "$schema": "http://json-schema.org/draft-07/schema#",
615 "type": "object",
616 "required": ["name"],
617 "properties": {
618 "name": {
619 "type": "string",
620 },
621 "description": {
622 "type": "string",
623 },
624 "base_family": {
625 "type": "string",
626 "description": "Base logic family to extend",
627 },
628 "overrides": {
629 "type": "object",
630 "additionalProperties": {"type": "number"},
631 },
632 "tolerance": {
633 "type": "number",
634 "description": "Tolerance percentage (0-100)",
635 "minimum": 0,
636 "maximum": 100,
637 "default": 0,
638 },
639 },
640 },
641 )
642 )
644 # Preferences schema
645 registry.register(
646 ConfigSchema(
647 name="preferences",
648 version="1.0.0",
649 description="User preferences",
650 schema={
651 "$schema": "http://json-schema.org/draft-07/schema#",
652 "type": "object",
653 "properties": {
654 "defaults": {
655 "type": "object",
656 "properties": {
657 "sample_rate": {"type": "number", "minimum": 0},
658 "window_function": {"type": "string"},
659 "fft_size": {"type": "integer", "minimum": 1},
660 },
661 },
662 "visualization": {
663 "type": "object",
664 "properties": {
665 "style": {"type": "string"},
666 "figure_size": {
667 "type": "array",
668 "items": {"type": "number"},
669 "minItems": 2,
670 "maxItems": 2,
671 },
672 "dpi": {"type": "integer", "minimum": 50, "maximum": 600},
673 "colormap": {"type": "string"},
674 },
675 },
676 "export": {
677 "type": "object",
678 "properties": {
679 "default_format": {
680 "type": "string",
681 "enum": ["csv", "hdf5", "npz", "json"],
682 },
683 "precision": {
684 "type": "integer",
685 "minimum": 1,
686 "maximum": 15,
687 },
688 },
689 },
690 "logging": {
691 "type": "object",
692 "properties": {
693 "level": {
694 "type": "string",
695 "enum": ["DEBUG", "INFO", "WARNING", "ERROR"],
696 },
697 "file": {"type": "string"},
698 },
699 },
700 },
701 },
702 )
703 )
706__all__ = [
707 "ConfigSchema",
708 "SchemaRegistry",
709 "ValidationError",
710 "get_schema_registry",
711 "register_schema",
712 "validate_against_schema",
713]