Coverage for little_loops / extension.py: 88%
110 statements
« prev ^ index » next coverage.py v7.12.0, created at 2026-05-22 16:19 -0500
« prev ^ index » next coverage.py v7.12.0, created at 2026-05-22 16:19 -0500
1"""Extension system for little-loops.
3Provides the extension Protocol, a reference implementation, and discovery/loading
4utilities for external packages to hook into little-loops via structured events.
6Public exports:
7 LLExtension: Protocol that extensions must satisfy
8 NoopLoggerExtension: Reference extension that logs events to a JSONL file
9 ExtensionLoader: Discovers and loads extensions from config and entry points
10"""
12from __future__ import annotations
14import importlib
15import json
16import logging
17from collections.abc import Callable
18from importlib.metadata import entry_points
19from pathlib import Path
20from typing import TYPE_CHECKING, Any, Protocol, runtime_checkable
22from little_loops.events import EventBus, EventCallback, LLEvent
23from little_loops.issue_parser import IssueInfo
25if TYPE_CHECKING:
26 from little_loops.fsm.executor import FSMExecutor, RouteContext, RouteDecision
27 from little_loops.fsm.persistence import PersistentExecutor
28 from little_loops.fsm.runners import ActionRunner
29 from little_loops.fsm.types import Evaluator
30 from little_loops.hooks.types import LLHookEvent, LLHookResult
32logger = logging.getLogger(__name__)
34ENTRY_POINT_GROUP = "little_loops.extensions"
37@runtime_checkable
38class LLExtension(Protocol):
39 """Protocol for little-loops extensions.
41 Any class with an ``on_event`` method matching this signature is a valid
42 extension. Extensions receive structured events from the EventBus.
44 Optionally, an extension may declare ``event_filter`` to subscribe only to
45 specific event namespaces via glob patterns (e.g. ``"issue.*"`` or
46 ``["issue.*", "parallel.*"]``). ``None`` (the default) means the extension
47 receives every event.
48 """
50 event_filter: str | list[str] | None
52 def on_event(self, event: LLEvent) -> None:
53 """Handle an event from little-loops.
55 Args:
56 event: Structured event with type, timestamp, and payload
57 """
58 ...
61class InterceptorExtension(Protocol):
62 """Protocol for extensions that intercept FSM routing decisions.
64 Detected via hasattr() in wire_extensions() — no @runtime_checkable needed.
65 All methods are optional to implement individually; detection is per-method.
66 """
68 def before_route(self, context: RouteContext) -> RouteDecision | None:
69 """Called before routing; return RouteDecision to redirect or veto, None to pass through."""
70 ...
72 def after_route(self, context: RouteContext) -> None:
73 """Called after routing is resolved (observational only)."""
74 ...
76 def before_issue_close(self, info: IssueInfo) -> bool | None:
77 """Called before an issue is closed; return False to veto, None to pass through."""
78 ...
81class ActionProviderExtension(Protocol):
82 """Protocol for extensions that contribute custom actions to FSM loops.
84 Detected via hasattr() in wire_extensions() — no @runtime_checkable needed.
85 """
87 def provided_actions(self) -> dict[str, ActionRunner]:
88 """Return a mapping of action name → ActionRunner for injection into the executor."""
89 ...
92class EvaluatorProviderExtension(Protocol):
93 """Protocol for extensions that contribute custom evaluators to FSM loops.
95 Detected via hasattr() in wire_extensions() — no @runtime_checkable needed.
96 """
98 def provided_evaluators(self) -> dict[str, Evaluator]:
99 """Return a mapping of evaluator type name → Evaluator callable."""
100 ...
103@runtime_checkable
104class LLHookIntentExtension(Protocol):
105 """Protocol for extensions that contribute hook intent handlers.
107 Detected via hasattr() in wire_extensions(). Returned handlers are
108 merged into the dispatch table consulted by little_loops.hooks.main_hooks().
109 """
111 def provided_hook_intents(self) -> dict[str, Callable[[LLHookEvent], LLHookResult]]: ...
114class NoopLoggerExtension:
115 """Reference extension that logs events to a JSONL file.
117 Demonstrates the extension API by writing each event as a JSON line
118 to a specified log file. Useful for debugging and as a template for
119 building custom extensions.
120 """
122 def __init__(self, log_path: Path | None = None) -> None:
123 self._log_path = log_path or Path(".ll/extension-events.jsonl")
124 self._log_path.parent.mkdir(parents=True, exist_ok=True)
126 def on_event(self, event: LLEvent) -> None:
127 """Append event to JSONL log file."""
128 with open(self._log_path, "a", encoding="utf-8") as f:
129 f.write(json.dumps(event.to_dict()) + "\n")
132class ExtensionLoader:
133 """Discover and load extensions from config paths and entry points."""
135 @staticmethod
136 def from_config(extension_paths: list[str]) -> list[LLExtension]:
137 """Load extensions from dotted module paths.
139 Each path should be in the format "module.path:ClassName".
140 Extensions that fail to load are skipped with a warning.
142 Args:
143 extension_paths: List of "module:Class" strings
145 Returns:
146 List of successfully loaded extension instances
147 """
148 extensions: list[Any] = []
149 for path in extension_paths:
150 try:
151 module_path, class_name = path.rsplit(":", 1)
152 module = importlib.import_module(module_path)
153 cls = getattr(module, class_name)
154 extensions.append(cls())
155 except Exception:
156 logger.warning("Failed to load extension from config: %s", path, exc_info=True)
157 return extensions
159 @staticmethod
160 def from_entry_points() -> list[LLExtension]:
161 """Discover extensions via importlib.metadata entry points.
163 Looks for entry points in the "little_loops.extensions" group.
165 Returns:
166 List of successfully loaded extension instances
167 """
168 extensions: list[Any] = []
169 try:
170 eps = entry_points(group=ENTRY_POINT_GROUP)
171 except TypeError:
172 # Python 3.11 compatibility: entry_points() may not support group kwarg
173 eps = entry_points().get(ENTRY_POINT_GROUP, []) # type: ignore[attr-defined, assignment]
174 for ep in eps:
175 try:
176 cls = ep.load()
177 extensions.append(cls())
178 except Exception:
179 logger.warning("Failed to load extension entry point: %s", ep.name, exc_info=True)
180 return extensions
182 @staticmethod
183 def load_all(config_paths: list[str] | None = None) -> list[LLExtension]:
184 """Load extensions from all discovery sources.
186 Combines extensions from config paths and entry points.
188 Args:
189 config_paths: Optional list of "module:Class" strings from config
191 Returns:
192 Combined list of all loaded extensions
193 """
194 extensions: list[Any] = []
195 if config_paths:
196 extensions.extend(ExtensionLoader.from_config(config_paths))
197 extensions.extend(ExtensionLoader.from_entry_points())
198 return extensions
201def wire_extensions(
202 bus: EventBus,
203 config_paths: list[str] | None = None,
204 executor: FSMExecutor | PersistentExecutor | None = None,
205) -> list[LLExtension]:
206 """Load extensions and register them on an EventBus and optional FSMExecutor.
208 Each extension's ``on_event`` callback is wrapped to convert the raw
209 ``dict[str, Any]`` dispatched by ``EventBus.emit()`` into an ``LLEvent``
210 using ``from_raw_event()`` (which copies the dict to avoid mutation).
212 When ``executor`` is provided, a second pass populates
213 ``executor._contributed_actions``, ``executor._contributed_evaluators``,
214 and ``executor._interceptors`` from each extension that implements the
215 corresponding protocols.
217 Args:
218 bus: EventBus to register extension callbacks on
219 config_paths: Optional list of "module:Class" strings from config
220 executor: Optional FSMExecutor to populate with contributed types
222 Returns:
223 List of loaded extension instances
224 """
225 extensions = ExtensionLoader.load_all(config_paths)
226 extensions = sorted(extensions, key=lambda e: getattr(e, "priority", 0))
228 def _make_callback(e: LLExtension) -> EventCallback:
229 def _cb(event: dict[str, Any]) -> None:
230 e.on_event(LLEvent.from_raw_event(event))
232 return _cb
234 for ext in extensions:
235 if hasattr(ext, "on_event"):
236 bus.register(_make_callback(ext), filter=getattr(ext, "event_filter", None))
238 fsm_executor: FSMExecutor | None
239 if executor is None:
240 fsm_executor = None
241 elif hasattr(executor, "_executor"):
242 fsm_executor = executor._executor
243 else:
244 fsm_executor = executor
246 if fsm_executor is not None:
247 for ext in extensions:
248 if hasattr(ext, "provided_actions"):
249 for name in ext.provided_actions():
250 if name in fsm_executor._contributed_actions:
251 raise ValueError(
252 f"Extension conflict: action '{name}' already registered by another extension"
253 )
254 fsm_executor._contributed_actions.update(ext.provided_actions())
255 if hasattr(ext, "provided_evaluators"):
256 for name in ext.provided_evaluators():
257 if name in fsm_executor._contributed_evaluators:
258 raise ValueError(
259 f"Extension conflict: evaluator '{name}' already registered by another extension"
260 )
261 fsm_executor._contributed_evaluators.update(ext.provided_evaluators())
262 if (
263 hasattr(ext, "before_route")
264 or hasattr(ext, "after_route")
265 or hasattr(ext, "before_issue_close")
266 ):
267 fsm_executor._interceptors.append(ext)
269 for ext in extensions:
270 if hasattr(ext, "provided_hook_intents"):
271 from little_loops.hooks import _register_hook_intents
273 _register_hook_intents(ext.provided_hook_intents())
275 if extensions:
276 logger.info("Wired %d extension(s) to EventBus", len(extensions))
277 return extensions