Coverage for src / tracekit / extensibility / extensions.py: 51%
302 statements
« prev ^ index » next coverage.py v7.13.1, created at 2026-01-11 23:04 +0000
« prev ^ index » next coverage.py v7.13.1, created at 2026-01-11 23:04 +0000
1"""Extension point registry and management system.
3This module implements a central registry for extension points that allows
4plugins and custom code to extend TraceKit functionality at well-defined
5integration points.
6"""
8from __future__ import annotations
10import logging
11from dataclasses import dataclass, field
12from enum import Enum, auto
13from typing import TYPE_CHECKING, Any, TypeVar
15if TYPE_CHECKING:
16 from collections.abc import Callable
18logger = logging.getLogger(__name__)
20T = TypeVar("T")
23class HookErrorPolicy(Enum):
24 """Policy for handling hook errors.
26 Attributes:
27 CONTINUE: Continue executing remaining hooks after error
28 ABORT: Stop execution immediately on error
29 IGNORE: Ignore error silently
30 """
32 CONTINUE = auto()
33 ABORT = auto()
34 IGNORE = auto()
37@dataclass
38class ExtensionPointSpec:
39 """Specification for an extension point.
41 Defines the contract that implementations must follow including
42 required and optional methods, version info, and documentation.
44 Attributes:
45 name: Unique name for the extension point
46 version: API version (semver format)
47 description: Human-readable description
48 required_methods: List of method names that must be implemented
49 optional_methods: List of optional method names
50 interface: Optional interface class that implementations should inherit from
52 Example:
53 >>> spec = ExtensionPointSpec(
54 ... name="protocol_decoder",
55 ... version="1.0.0",
56 ... description="Decode protocol from waveform",
57 ... required_methods=["decode", "get_metadata"],
58 ... optional_methods=["configure", "reset"]
59 ... )
61 References:
62 EXT-001: Extension Point Registry
63 """
65 name: str
66 version: str = "1.0.0"
67 description: str = ""
68 required_methods: list[str] = field(default_factory=list)
69 optional_methods: list[str] = field(default_factory=list)
70 interface: type | None = None
72 def validate_implementation(self, impl: Any) -> tuple[bool, list[str]]:
73 """Validate that implementation matches interface.
75 Args:
76 impl: Implementation to validate
78 Returns:
79 Tuple of (is_valid, list of missing methods)
81 Example:
82 >>> is_valid, missing = spec.validate_implementation(MyDecoder())
83 >>> if not is_valid:
84 ... print(f"Missing methods: {missing}")
85 """
86 missing = []
87 for method in self.required_methods:
88 if not hasattr(impl, method) or not callable(getattr(impl, method)): 88 ↛ 89line 88 didn't jump to line 89 because the condition on line 88 was never true
89 missing.append(method)
90 return len(missing) == 0, missing
93@dataclass
94class RegisteredAlgorithm:
95 """Metadata for a registered algorithm.
97 Attributes:
98 name: Algorithm name
99 category: Algorithm category
100 func: Algorithm implementation
101 priority: Execution priority (higher = first)
102 performance: Performance characteristics
103 supports: Supported data types
104 description: Human-readable description
105 complexity: Time complexity string
106 capabilities: Algorithm capabilities
107 memory_usage: Memory usage characteristics
108 registration_order: Order in which algorithm was registered
110 References:
111 EXT-002: Algorithm Registration (capability queries, performance metadata)
112 EXT-004: Priority System (registration order for tie-breaking)
113 """
115 name: str
116 category: str
117 func: Callable[..., Any]
118 priority: int = 50
119 performance: dict[str, str] = field(default_factory=dict)
120 supports: list[str] = field(default_factory=list)
121 description: str = ""
122 complexity: str = "O(n)"
123 capabilities: dict[str, Any] = field(default_factory=dict)
124 memory_usage: str = "unknown"
125 registration_order: int = 0
127 def can(self, capability: str) -> bool:
128 """Check if algorithm has a specific capability.
130 Args:
131 capability: Capability name to check
133 Returns:
134 True if algorithm supports the capability
136 Example:
137 >>> algo.can("multi_channel")
138 True
140 References:
141 EXT-002: Algorithm Registration (capability queries)
142 """
143 return self.capabilities.get(capability, False) # type: ignore[no-any-return]
145 def get_capabilities(self) -> dict[str, Any]:
146 """Get all capabilities of this algorithm.
148 Returns:
149 Dictionary of capability names to values
151 Example:
152 >>> caps = algo.get_capabilities()
153 >>> print(caps)
154 {'multi_channel': True, 'real_time': False, 'max_sample_rate': 1000000}
156 References:
157 EXT-002: Algorithm Registration (capability queries)
158 """
159 return self.capabilities.copy()
162@dataclass
163class HookContext:
164 """Context passed to hook functions.
166 Attributes:
167 data: Primary data being processed
168 metadata: Additional context metadata
169 abort: Set to True to abort operation
170 abort_reason: Reason for abort
172 Example:
173 >>> @tk.hooks.register("pre_decode")
174 >>> def validate_waveform(context):
175 ... if context.data.sample_rate < 1000:
176 ... context.abort = True
177 ... context.abort_reason = "Sample rate too low"
178 ... return context
180 References:
181 EXT-005: Hook System
182 """
184 data: Any = None
185 metadata: dict[str, Any] = field(default_factory=dict)
186 abort: bool = False
187 abort_reason: str = ""
189 def __post_init__(self): # type: ignore[no-untyped-def]
190 """Initialize metadata if None."""
191 if self.metadata is None: 191 ↛ 192line 191 didn't jump to line 192 because the condition on line 191 was never true
192 self.metadata = {} # type: ignore[unreachable]
195@dataclass
196class RegisteredHook:
197 """Registered hook function.
199 Attributes:
200 hook_point: Name of hook point
201 func: Hook function
202 priority: Execution priority (higher = first)
203 name: Optional hook name
204 """
206 hook_point: str
207 func: Callable[[HookContext], HookContext]
208 priority: int = 50
209 name: str = ""
212class ExtensionPointRegistry:
213 """Central registry of all extension points in TraceKit.
215 Manages registration and lookup of extension points, algorithms,
216 and hooks throughout the system.
218 Example:
219 >>> # List all extension points
220 >>> extension_points = tk.extensions.list()
221 >>> for ep in extension_points:
222 ... print(f"{ep.name} v{ep.version}")
224 >>> # Get specific extension point
225 >>> decoder_ep = tk.extensions.get("protocol_decoder")
226 >>> print(f"Required methods: {decoder_ep.required_methods}")
228 References:
229 EXT-001: Extension Point Registry
230 EXT-002: Algorithm Registration
231 EXT-003: Algorithm Selection
232 EXT-004: Priority System
233 EXT-005: Hook System
234 EXT-006: Custom Decoder Registration
235 """
237 _instance: ExtensionPointRegistry | None = None
239 def __new__(cls) -> ExtensionPointRegistry:
240 """Ensure singleton instance.
242 Returns:
243 Singleton ExtensionPointRegistry instance.
244 """
245 if cls._instance is None: 245 ↛ 254line 245 didn't jump to line 254 because the condition on line 245 was always true
246 cls._instance = super().__new__(cls)
247 cls._instance._extension_points: dict[str, ExtensionPointSpec] = {} # type: ignore[misc, attr-defined]
248 cls._instance._algorithms: dict[str, dict[str, RegisteredAlgorithm]] = {} # type: ignore[misc, attr-defined]
249 cls._instance._hooks: dict[str, list[RegisteredHook]] = {} # type: ignore[misc, attr-defined]
250 cls._instance._hook_error_policy = HookErrorPolicy.CONTINUE
251 cls._instance._log_hook_errors = True
252 cls._instance._initialized = False # type: ignore[has-type]
253 cls._instance._registration_counter = 0 # type: ignore[misc, attr-defined]
254 return cls._instance
256 def initialize(self) -> None:
257 """Initialize built-in extension points.
259 Registers the standard extension points that come with TraceKit.
260 """
261 if self._initialized: # type: ignore[has-type]
262 return
264 # Register standard extension points
265 self.register_point(
266 ExtensionPointSpec(
267 name="protocol_decoder",
268 version="1.0.0",
269 description="Decode protocol from waveform or digital trace",
270 required_methods=["decode", "get_metadata"],
271 optional_methods=["configure", "reset", "validate_config"],
272 )
273 )
275 self.register_point(
276 ExtensionPointSpec(
277 name="file_loader",
278 version="1.0.0",
279 description="Load trace data from file format",
280 required_methods=["load", "can_load"],
281 optional_methods=["get_metadata", "get_channels"],
282 )
283 )
285 self.register_point(
286 ExtensionPointSpec(
287 name="measurement",
288 version="1.0.0",
289 description="Compute measurement from trace",
290 required_methods=["measure"],
291 optional_methods=["validate_input", "get_units"],
292 )
293 )
295 self.register_point(
296 ExtensionPointSpec(
297 name="exporter",
298 version="1.0.0",
299 description="Export trace data to file format",
300 required_methods=["export"],
301 optional_methods=["get_supported_formats"],
302 )
303 )
305 self.register_point(
306 ExtensionPointSpec(
307 name="algorithm",
308 version="1.0.0",
309 description="Signal processing algorithm",
310 required_methods=["process"],
311 optional_methods=["configure", "get_parameters"],
312 )
313 )
315 self._initialized = True
316 logger.debug("Extension point registry initialized with built-in points")
318 # =========================================================================
319 # Extension Point Management (EXT-001)
320 # =========================================================================
322 def register_point(self, spec: ExtensionPointSpec) -> None:
323 """Register an extension point.
325 Args:
326 spec: Extension point specification
328 Raises:
329 ValueError: If extension point already exists
331 Example:
332 >>> spec = ExtensionPointSpec(
333 ... name="my_extension",
334 ... version="1.0.0",
335 ... required_methods=["process"]
336 ... )
337 >>> registry.register_point(spec)
338 """
339 if spec.name in self._extension_points: # type: ignore[attr-defined] 339 ↛ 340line 339 didn't jump to line 340 because the condition on line 339 was never true
340 raise ValueError(f"Extension point '{spec.name}' already registered")
341 self._extension_points[spec.name] = spec # type: ignore[attr-defined]
342 logger.debug(f"Registered extension point: {spec.name} v{spec.version}")
344 def get_point(self, name: str) -> ExtensionPointSpec:
345 """Get extension point specification.
347 Args:
348 name: Extension point name
350 Returns:
351 Extension point specification
353 Raises:
354 KeyError: If extension point not found
355 """
356 if name not in self._extension_points: # type: ignore[attr-defined] 356 ↛ 357line 356 didn't jump to line 357 because the condition on line 356 was never true
357 raise KeyError(
358 f"Extension point '{name}' not found. "
359 f"Available: {list(self._extension_points.keys())}" # type: ignore[attr-defined]
360 )
361 return self._extension_points[name] # type: ignore[no-any-return, attr-defined]
363 def list_points(self) -> list[ExtensionPointSpec]:
364 """List all registered extension points.
366 Returns:
367 List of extension point specifications
368 """
369 return list(self._extension_points.values()) # type: ignore[attr-defined]
371 def exists(self, name: str) -> bool:
372 """Check if extension point exists.
374 Args:
375 name: Extension point name
377 Returns:
378 True if exists
379 """
380 return name in self._extension_points # type: ignore[attr-defined]
382 # =========================================================================
383 # Algorithm Management (EXT-002, EXT-003, EXT-004)
384 # =========================================================================
386 def register_algorithm(
387 self,
388 name: str,
389 func: Callable[..., Any],
390 category: str,
391 priority: int = 50,
392 performance: dict[str, str] | None = None,
393 supports: list[str] | None = None,
394 description: str = "",
395 complexity: str = "O(n)",
396 capabilities: dict[str, Any] | None = None,
397 memory_usage: str = "unknown",
398 ) -> None:
399 """Register a custom algorithm implementation.
401 Args:
402 name: Algorithm name
403 func: Algorithm function
404 category: Algorithm category
405 priority: Execution priority (0-100, higher = first)
406 performance: Performance characteristics dict (speed/accuracy/memory)
407 supports: List of supported data types
408 description: Human-readable description
409 complexity: Time complexity string (e.g., "O(n)", "O(n log n)")
410 capabilities: Algorithm capabilities dict (e.g., {'multi_channel': True})
411 memory_usage: Memory usage characteristics (low/medium/high/unknown)
413 Raises:
414 ValueError: If algorithm already registered
415 TypeError: If func is not callable
417 Example:
418 >>> def my_edge_detector(data, threshold=0.5):
419 ... return find_edges(data, threshold)
420 >>> registry.register_algorithm(
421 ... name="my_detector",
422 ... func=my_edge_detector,
423 ... category="edge_detection",
424 ... priority=75,
425 ... performance={"speed": "fast", "accuracy": "medium", "memory": "low"},
426 ... capabilities={"multi_channel": True, "max_sample_rate": 1000000},
427 ... memory_usage="low"
428 ... )
430 References:
431 EXT-002: Algorithm Registration (capability queries, performance metadata)
432 """
433 if not callable(func): 433 ↛ 434line 433 didn't jump to line 434 because the condition on line 433 was never true
434 raise TypeError(f"Algorithm must be callable, got {type(func).__name__}")
436 if category not in self._algorithms: # type: ignore[attr-defined]
437 self._algorithms[category] = {} # type: ignore[attr-defined]
439 if name in self._algorithms[category]: # type: ignore[attr-defined] 439 ↛ 440line 439 didn't jump to line 440 because the condition on line 439 was never true
440 raise ValueError(f"Algorithm '{name}' already registered in category '{category}'")
442 # Increment registration counter
443 self._registration_counter += 1 # type: ignore[attr-defined]
445 algo = RegisteredAlgorithm(
446 name=name,
447 category=category,
448 func=func,
449 priority=priority,
450 performance=performance or {},
451 supports=supports or [],
452 description=description,
453 complexity=complexity,
454 capabilities=capabilities or {},
455 memory_usage=memory_usage,
456 registration_order=self._registration_counter, # type: ignore[attr-defined]
457 )
459 self._algorithms[category][name] = algo # type: ignore[attr-defined]
460 logger.debug(f"Registered algorithm: {name} in category {category}")
462 def get_algorithm(self, category: str, name: str) -> RegisteredAlgorithm:
463 """Get algorithm by category and name.
465 Args:
466 category: Algorithm category
467 name: Algorithm name
469 Returns:
470 Registered algorithm metadata
472 Raises:
473 KeyError: If not found
474 """
475 if category not in self._algorithms: # type: ignore[attr-defined] 475 ↛ 476line 475 didn't jump to line 476 because the condition on line 475 was never true
476 raise KeyError(f"Category '{category}' not found")
477 if name not in self._algorithms[category]: # type: ignore[attr-defined] 477 ↛ 478line 477 didn't jump to line 478 because the condition on line 477 was never true
478 raise KeyError(f"Algorithm '{name}' not found in category '{category}'")
479 return self._algorithms[category][name] # type: ignore[no-any-return, attr-defined]
481 def select_algorithm(
482 self,
483 category: str,
484 name: str | None = None,
485 *,
486 optimize_for: str = "speed",
487 constraints: dict[str, Any] | None = None,
488 required_capabilities: list[str] | None = None,
489 ) -> RegisteredAlgorithm:
490 """Select algorithm implementation at runtime.
492 Selects by name if provided, otherwise auto-selects based on
493 optimization criteria and capability matching.
495 Args:
496 category: Algorithm category
497 name: Specific algorithm name (optional)
498 optimize_for: Optimization target: "speed", "accuracy", "memory"
499 constraints: Filter constraints on performance/supports
500 required_capabilities: List of required capabilities for auto-selection
502 Returns:
503 Selected algorithm
505 Raises:
506 KeyError: If category not found or no matching algorithm
508 Example:
509 >>> # Select by name
510 >>> algo = registry.select_algorithm("edge_detection", "fast_detector")
512 >>> # Auto-select for speed
513 >>> algo = registry.select_algorithm(
514 ... "edge_detection",
515 ... optimize_for="speed"
516 ... )
518 >>> # Auto-select by capability matching
519 >>> algo = registry.select_algorithm(
520 ... "edge_detection",
521 ... required_capabilities=["multi_channel", "real_time"]
522 ... )
524 References:
525 EXT-003: Algorithm Selection (auto-selection by capability matching)
526 """
527 if category not in self._algorithms: # type: ignore[attr-defined] 527 ↛ 528line 527 didn't jump to line 528 because the condition on line 527 was never true
528 raise KeyError(f"Category '{category}' not found")
530 if name: 530 ↛ 531line 530 didn't jump to line 531 because the condition on line 530 was never true
531 return self.get_algorithm(category, name)
533 # Auto-select based on criteria
534 candidates = list(self._algorithms[category].values()) # type: ignore[attr-defined]
536 if not candidates: 536 ↛ 537line 536 didn't jump to line 537 because the condition on line 536 was never true
537 raise KeyError(f"No algorithms registered in category '{category}'")
539 # Filter by required capabilities (EXT-003)
540 if required_capabilities: 540 ↛ 541line 540 didn't jump to line 541 because the condition on line 540 was never true
541 filtered = []
542 for algo in candidates:
543 if all(algo.can(cap) for cap in required_capabilities):
544 filtered.append(algo)
545 candidates = filtered
547 if not candidates: 547 ↛ 548line 547 didn't jump to line 548 because the condition on line 547 was never true
548 raise KeyError(f"No algorithms match required capabilities in category '{category}'")
550 # Apply constraints
551 if constraints: 551 ↛ 552line 551 didn't jump to line 552 because the condition on line 551 was never true
552 filtered = []
553 for algo in candidates:
554 match = True
555 for key, value in constraints.items():
556 if key.startswith("performance."):
557 perf_key = key.split(".", 1)[1]
558 if algo.performance.get(perf_key) != value:
559 match = False
560 break
561 elif key.startswith("capabilities."):
562 cap_key = key.split(".", 1)[1]
563 if algo.capabilities.get(cap_key) != value:
564 match = False
565 break
566 elif key == "supports":
567 if isinstance(value, list):
568 if not any(s in algo.supports for s in value):
569 match = False
570 break
571 elif value not in algo.supports:
572 match = False
573 break
574 elif key == "memory_usage":
575 if algo.memory_usage != value:
576 match = False
577 break
578 if match:
579 filtered.append(algo)
580 candidates = filtered
582 if not candidates: 582 ↛ 583line 582 didn't jump to line 583 because the condition on line 582 was never true
583 raise KeyError(f"No algorithms match constraints in category '{category}'")
585 # Sort by optimization criteria
586 if optimize_for == "speed":
588 def sort_key(a): # type: ignore[no-untyped-def]
589 return (
590 0
591 if a.performance.get("speed") == "fast"
592 else 1
593 if a.performance.get("speed") == "medium"
594 else 2,
595 -a.priority,
596 )
597 elif optimize_for == "accuracy": 597 ↛ 608line 597 didn't jump to line 608 because the condition on line 597 was always true
599 def sort_key(a): # type: ignore[no-untyped-def]
600 return (
601 0
602 if a.performance.get("accuracy") == "high"
603 else 1
604 if a.performance.get("accuracy") == "medium"
605 else 2,
606 -a.priority,
607 )
608 elif optimize_for == "memory":
610 def sort_key(a): # type: ignore[no-untyped-def]
611 return (
612 0
613 if a.performance.get("memory") == "low"
614 else 1
615 if a.performance.get("memory") == "medium"
616 else 2,
617 -a.priority,
618 )
619 else:
620 # Default to priority only
621 def sort_key(a): # type: ignore[no-untyped-def]
622 return -a.priority
624 candidates.sort(key=sort_key)
625 return candidates[0] # type: ignore[no-any-return]
627 def list_algorithms(
628 self,
629 category: str,
630 ordered: bool = False,
631 tie_break: str = "name",
632 ) -> list[RegisteredAlgorithm]:
633 """List all algorithms in a category.
635 Args:
636 category: Algorithm category
637 ordered: If True, sort by priority (highest first)
638 tie_break: Tie-breaking rule: "name" (alphabetical) or "registration" (order registered)
640 Returns:
641 List of registered algorithms
643 Raises:
644 KeyError: If category not found
646 Example:
647 >>> # Get algorithms sorted by priority, ties broken by name
648 >>> algos = registry.list_algorithms("edge_detection", ordered=True, tie_break="name")
650 >>> # Get algorithms sorted by priority, ties broken by registration order
651 >>> algos = registry.list_algorithms("edge_detection", ordered=True, tie_break="registration")
653 References:
654 EXT-004: Priority System (tie-breaking rules by name or registration order)
655 """
656 if category not in self._algorithms: # type: ignore[attr-defined] 656 ↛ 657line 656 didn't jump to line 657 because the condition on line 656 was never true
657 raise KeyError(f"Category '{category}' not found")
659 algos = list(self._algorithms[category].values()) # type: ignore[attr-defined]
661 if ordered: 661 ↛ 669line 661 didn't jump to line 669 because the condition on line 661 was always true
662 if tie_break == "registration": 662 ↛ 664line 662 didn't jump to line 664 because the condition on line 662 was never true
663 # Sort by priority (highest first), then by registration order for ties
664 algos.sort(key=lambda a: (-a.priority, a.registration_order))
665 else:
666 # Sort by priority (highest first), then by name for ties (default)
667 algos.sort(key=lambda a: (-a.priority, a.name))
669 return algos
671 def list_categories(self) -> list[str]:
672 """List all algorithm categories.
674 Returns:
675 List of category names
676 """
677 return list(self._algorithms.keys()) # type: ignore[attr-defined]
679 def benchmark_algorithms(
680 self,
681 category: str,
682 test_data: Any,
683 *,
684 metrics: list[str] | None = None,
685 iterations: int = 10,
686 ) -> dict[str, dict[str, float]]:
687 """Benchmark all algorithms in a category.
689 Runs performance tests on all registered algorithms and measures
690 execution time, memory usage, and optionally custom metrics.
692 Args:
693 category: Algorithm category to benchmark
694 test_data: Test data to pass to algorithms
695 metrics: List of metrics to measure (defaults to ["execution_time"])
696 iterations: Number of iterations to average over
698 Returns:
699 Dict mapping algorithm names to metric results
701 Raises:
702 KeyError: If category is not found.
704 Example:
705 >>> import numpy as np
706 >>> test_signal = np.random.randn(1000)
707 >>> results = registry.benchmark_algorithms(
708 ... "edge_detection",
709 ... test_signal,
710 ... metrics=["execution_time", "memory_usage"],
711 ... iterations=100
712 ... )
713 >>> for name, metrics in results.items():
714 ... print(f"{name}: {metrics['execution_time']:.3f}s")
716 References:
717 EXT-003: Algorithm Selection (benchmarking support)
718 """
719 import time
720 import tracemalloc
722 if category not in self._algorithms: # type: ignore[attr-defined]
723 raise KeyError(f"Category '{category}' not found")
725 if metrics is None:
726 metrics = ["execution_time"]
728 results = {}
730 for name, algo in self._algorithms[category].items(): # type: ignore[attr-defined]
731 algo_results = {}
733 if "execution_time" in metrics:
734 times = []
735 for _ in range(iterations):
736 start = time.perf_counter()
737 try:
738 algo.func(test_data)
739 except Exception as e:
740 logger.warning(f"Algorithm {name} failed during benchmark: {e}")
741 times.append(float("inf"))
742 continue
743 end = time.perf_counter()
744 times.append(end - start)
746 algo_results["execution_time"] = sum(times) / len(times)
747 algo_results["min_time"] = min(times)
748 algo_results["max_time"] = max(times)
750 if "memory_usage" in metrics:
751 tracemalloc.start()
752 try:
753 algo.func(test_data)
754 current, peak = tracemalloc.get_traced_memory()
755 algo_results["memory_current"] = current / 1024 / 1024 # MB
756 algo_results["memory_peak"] = peak / 1024 / 1024 # MB
757 except Exception as e:
758 logger.warning(f"Algorithm {name} failed during benchmark: {e}")
759 algo_results["memory_current"] = float("inf")
760 algo_results["memory_peak"] = float("inf")
761 finally:
762 tracemalloc.stop()
764 results[name] = algo_results
766 return results
768 def configure_priorities(self, config: dict[str, dict[str, int]]) -> None:
769 """Override algorithm priorities via configuration.
771 Args:
772 config: Dict mapping category -> {algorithm_name: new_priority}
774 Example:
775 >>> registry.configure_priorities({
776 ... "edge_detection": {
777 ... "fast_detector": 100,
778 ... "accurate_detector": 50
779 ... }
780 ... })
782 References:
783 EXT-004: Priority System
784 """
785 for category, priorities in config.items():
786 if category not in self._algorithms: # type: ignore[attr-defined] 786 ↛ 787line 786 didn't jump to line 787 because the condition on line 786 was never true
787 continue
788 for name, priority in priorities.items():
789 if name in self._algorithms[category]: # type: ignore[attr-defined] 789 ↛ 788line 789 didn't jump to line 788 because the condition on line 789 was always true
790 self._algorithms[category][name].priority = priority # type: ignore[attr-defined]
791 logger.debug(f"Set priority for {category}/{name} to {priority}")
793 # =========================================================================
794 # Hook System (EXT-005)
795 # =========================================================================
797 def register_hook(
798 self,
799 hook_point: str,
800 func: Callable[[HookContext], HookContext],
801 priority: int = 50,
802 name: str = "",
803 ) -> None:
804 """Register a hook function.
806 Args:
807 hook_point: Name of hook point (e.g., "pre_decode", "post_decode")
808 func: Hook function accepting and returning HookContext
809 priority: Execution priority (higher = first)
810 name: Optional hook name for identification
812 Example:
813 >>> @tk.hooks.register("pre_decode")
814 >>> def validate_waveform(context):
815 ... if context.data.sample_rate < 1000:
816 ... raise ValueError("Sample rate too low")
817 ... return context
819 References:
820 EXT-005: Hook System
821 """
822 if hook_point not in self._hooks: # type: ignore[attr-defined]
823 self._hooks[hook_point] = [] # type: ignore[attr-defined]
825 hook = RegisteredHook(
826 hook_point=hook_point,
827 func=func,
828 priority=priority,
829 name=name or func.__name__,
830 )
832 self._hooks[hook_point].append(hook) # type: ignore[attr-defined]
833 # Sort by priority (highest first)
834 self._hooks[hook_point].sort(key=lambda h: -h.priority) # type: ignore[attr-defined]
836 logger.debug(f"Registered hook '{hook.name}' at point '{hook_point}'")
838 def execute_hooks(self, hook_point: str, context: HookContext) -> HookContext:
839 """Execute all hooks at a hook point with chaining and error isolation.
841 Hooks are executed in priority order (highest first). Each hook receives
842 the context from the previous hook (chaining). If a hook fails, the error
843 is isolated based on the configured error policy, preventing one hook's
844 failure from stopping other hooks.
846 Args:
847 hook_point: Hook point name
848 context: Hook context to pass through
850 Returns:
851 Modified context after all hooks
853 Raises:
854 Exception: If error policy is ABORT and a hook fails.
856 Example:
857 >>> context = HookContext(data=trace)
858 >>> context = registry.execute_hooks("pre_decode", context)
859 >>> if context.abort:
860 ... raise ValueError(context.abort_reason)
862 References:
863 EXT-005: Hook System (hook chaining, error isolation)
864 """
865 if hook_point not in self._hooks: # type: ignore[attr-defined] 865 ↛ 866line 865 didn't jump to line 866 because the condition on line 865 was never true
866 return context
868 # Execute hooks in priority order (hook chaining - EXT-005)
869 for hook in self._hooks[hook_point]: # type: ignore[attr-defined]
870 try:
871 context = hook.func(context)
872 if context.abort: 872 ↛ 873line 872 didn't jump to line 873 because the condition on line 872 was never true
873 logger.info(f"Hook '{hook.name}' requested abort: {context.abort_reason}")
874 break
875 except Exception as e:
876 # Error isolation - EXT-005: one hook failure doesn't stop others
877 if self._log_hook_errors:
878 logger.error(f"Hook '{hook.name}' at '{hook_point}' failed: {e}")
880 if self._hook_error_policy == HookErrorPolicy.ABORT:
881 raise
882 elif self._hook_error_policy == HookErrorPolicy.CONTINUE:
883 continue # Continue to next hook despite error
884 # IGNORE falls through
886 return context
888 def configure_hooks(self, on_error: str = "continue", log_errors: bool = True) -> None:
889 """Configure hook error handling behavior.
891 Args:
892 on_error: Error policy: "continue", "abort", "ignore"
893 log_errors: Whether to log hook errors
895 References:
896 EXT-005: Hook System
897 """
898 policy_map = {
899 "continue": HookErrorPolicy.CONTINUE,
900 "abort": HookErrorPolicy.ABORT,
901 "ignore": HookErrorPolicy.IGNORE,
902 }
903 self._hook_error_policy = policy_map.get(on_error, HookErrorPolicy.CONTINUE)
904 self._log_hook_errors = log_errors
906 def list_hooks(self, hook_point: str | None = None) -> dict[str, list[str]]:
907 """List registered hooks.
909 Args:
910 hook_point: Specific hook point, or None for all
912 Returns:
913 Dict mapping hook points to list of hook names
914 """
915 if hook_point:
916 if hook_point not in self._hooks: # type: ignore[attr-defined]
917 return {hook_point: []}
918 return {hook_point: [h.name for h in self._hooks[hook_point]]} # type: ignore[attr-defined]
920 return {point: [h.name for h in hooks] for point, hooks in self._hooks.items()} # type: ignore[attr-defined]
922 def clear_hooks(self, hook_point: str | None = None) -> None:
923 """Clear registered hooks.
925 Args:
926 hook_point: Specific hook point to clear, or None for all
927 """
928 if hook_point:
929 self._hooks.pop(hook_point, None) # type: ignore[attr-defined]
930 else:
931 self._hooks.clear() # type: ignore[attr-defined]
933 # =========================================================================
934 # Custom Decoder Registration (EXT-006)
935 # =========================================================================
937 def register_decoder(self, protocol: str, decoder_class: type, priority: int = 50) -> None:
938 """Register a custom protocol decoder.
940 Args:
941 protocol: Protocol name (e.g., "uart", "spi", "my_custom")
942 decoder_class: Decoder class implementing ProtocolDecoder interface
943 priority: Registration priority
945 Raises:
946 ValueError: If decoder doesn't implement required interface or lacks documentation
948 Example:
949 >>> class MyDecoder:
950 ... '''Custom decoder for my protocol.'''
951 ... def decode(self, trace):
952 ... return []
953 ... def get_metadata(self):
954 ... return {"name": "my_decoder"}
955 >>> registry.register_decoder("my_protocol", MyDecoder)
957 References:
958 EXT-006: Custom Decoder Registration (validation of decoder interface, documentation requirements)
959 """
960 # Validate decoder implements required interface
961 spec = self.get_point("protocol_decoder")
962 instance = decoder_class()
963 is_valid, missing = spec.validate_implementation(instance)
965 if not is_valid: 965 ↛ 966line 965 didn't jump to line 966 because the condition on line 965 was never true
966 raise ValueError(f"Decoder '{protocol}' missing required methods: {missing}")
968 # Check documentation requirements (EXT-006)
969 if not decoder_class.__doc__ or not decoder_class.__doc__.strip(): 969 ↛ 970line 969 didn't jump to line 970 because the condition on line 969 was never true
970 raise ValueError(
971 f"Decoder '{protocol}' must have a docstring documenting its purpose and usage"
972 )
974 # Register as algorithm in protocol_decoder category
975 self.register_algorithm(
976 name=protocol,
977 func=decoder_class,
978 category="protocol_decoder",
979 priority=priority,
980 description=decoder_class.__doc__.strip().split("\n")[0]
981 if decoder_class.__doc__
982 else f"Protocol decoder for {protocol}",
983 )
985 logger.info(f"Registered custom decoder for protocol: {protocol}")
987 def get_decoder(self, protocol: str) -> type:
988 """Get decoder class for a protocol.
990 Args:
991 protocol: Protocol name
993 Returns:
994 Decoder class
995 """
996 algo = self.get_algorithm("protocol_decoder", protocol)
997 return algo.func # type: ignore[return-value]
999 def list_decoders(self) -> list[str]:
1000 """List all registered protocol decoders.
1002 Returns:
1003 List of protocol names
1004 """
1005 if "protocol_decoder" not in self._algorithms: # type: ignore[attr-defined] 1005 ↛ 1006line 1005 didn't jump to line 1006 because the condition on line 1005 was never true
1006 return []
1007 return list(self._algorithms["protocol_decoder"].keys()) # type: ignore[attr-defined]
1010# Global registry instance
1011_registry = ExtensionPointRegistry()
1014# =========================================================================
1015# Module-Level Convenience Functions
1016# =========================================================================
1019def get_registry() -> ExtensionPointRegistry:
1020 """Get the global extension point registry.
1022 Returns:
1023 Global ExtensionPointRegistry instance
1024 """
1025 _registry.initialize()
1026 return _registry
1029def list_extension_points() -> list[ExtensionPointSpec]:
1030 """List all registered extension points.
1032 Returns:
1033 List of extension point specifications
1035 References:
1036 EXT-001: Extension Point Registry
1037 """
1038 return get_registry().list_points()
1041def get_extension_point(name: str) -> ExtensionPointSpec:
1042 """Get extension point by name.
1044 Args:
1045 name: Extension point name
1047 Returns:
1048 Extension point specification
1050 References:
1051 EXT-001: Extension Point Registry
1052 """
1053 return get_registry().get_point(name)
1056def extension_point_exists(name: str) -> bool:
1057 """Check if extension point exists.
1059 Args:
1060 name: Extension point name
1062 Returns:
1063 True if exists
1065 References:
1066 EXT-001: Extension Point Registry
1067 """
1068 return get_registry().exists(name)
1071def register_extension_point(spec: ExtensionPointSpec) -> None:
1072 """Register a new extension point.
1074 Args:
1075 spec: Extension point specification
1077 References:
1078 EXT-001: Extension Point Registry
1079 """
1080 get_registry().register_point(spec)
1083# Hook decorator
1084def hook(hook_point: str, priority: int = 50, name: str = ""): # type: ignore[no-untyped-def]
1085 """Decorator for registering hook functions.
1087 Args:
1088 hook_point: Hook point name
1089 priority: Execution priority
1090 name: Optional hook name
1092 Returns:
1093 Decorator function that registers the hook.
1095 Example:
1096 >>> @hook("pre_decode", priority=100)
1097 >>> def validate_input(context):
1098 ... # validation logic
1099 ... return context
1101 References:
1102 EXT-005: Hook System
1103 """
1105 def decorator(func: Callable[[HookContext], HookContext]): # type: ignore[no-untyped-def]
1106 get_registry().register_hook(hook_point, func, priority, name or func.__name__)
1107 return func
1109 return decorator
1112__all__ = [
1113 "ExtensionPointRegistry",
1114 "ExtensionPointSpec",
1115 "HookContext",
1116 "HookErrorPolicy",
1117 "RegisteredAlgorithm",
1118 "RegisteredHook",
1119 "extension_point_exists",
1120 "get_extension_point",
1121 "get_registry",
1122 "hook",
1123 "list_extension_points",
1124 "register_extension_point",
1125]