Coverage for little_loops / events.py: 97%
76 statements
« prev ^ index » next coverage.py v7.12.0, created at 2026-05-22 16:19 -0500
« prev ^ index » next coverage.py v7.12.0, created at 2026-05-22 16:19 -0500
1"""Event system for little-loops extension architecture.
3Provides structured event types and a multi-observer event bus that extensions
4can subscribe to for receiving lifecycle events from FSM loops, issue management,
5and automation workflows.
7Public exports:
8 LLEvent: Structured event dataclass with type, timestamp, and payload
9 EventBus: Multi-observer event dispatcher with pluggable transports
10"""
12from __future__ import annotations
14import fnmatch
15import json
16import logging
17from collections.abc import Callable
18from dataclasses import dataclass, field
19from pathlib import Path
20from typing import TYPE_CHECKING, Any
22if TYPE_CHECKING:
23 from little_loops.transport import Transport
25logger = logging.getLogger(__name__)
27# Type for event callback (matches existing EventCallback in fsm/executor.py)
28EventCallback = Callable[[dict[str, Any]], None]
31@dataclass
32class LLEvent:
33 """Structured event emitted by little-loops subsystems.
35 Attributes:
36 type: Event type identifier (e.g. "fsm.state_enter", "fsm.loop_complete")
37 timestamp: ISO 8601 timestamp string
38 payload: Type-specific event data
39 """
41 type: str
42 timestamp: str
43 payload: dict[str, Any] = field(default_factory=dict)
45 def to_dict(self) -> dict[str, Any]:
46 """Convert to dictionary for JSON serialization.
48 Produces a flat dict compatible with the existing FSM event format:
49 {"event": type, "ts": timestamp, ...payload}
50 """
51 return {"event": self.type, "ts": self.timestamp, **self.payload}
53 @classmethod
54 def from_dict(cls, data: dict[str, Any]) -> LLEvent:
55 """Create from dictionary (JSON deserialization).
57 Accepts the flat dict format: {"event": ..., "ts": ..., ...payload}
58 """
59 copy = dict(data)
60 event_type = copy.pop("event", copy.pop("type", "unknown"))
61 ts = copy.pop("ts", copy.pop("timestamp", ""))
62 return cls(type=event_type, timestamp=ts, payload=copy)
64 @classmethod
65 def from_raw_event(cls, raw: dict[str, Any]) -> LLEvent:
66 """Convert existing executor event dict to LLEvent without mutating the original."""
67 return cls.from_dict(dict(raw))
70class EventBus:
71 """Multi-observer event dispatcher.
73 Dispatches event dicts to all registered observers. Exceptions in individual
74 observers are caught and logged, not propagated.
75 """
77 def __init__(self) -> None:
78 self._observers: list[tuple[EventCallback, list[str] | None]] = []
79 self._transports: list[Transport] = []
81 def register(self, callback: EventCallback, filter: str | list[str] | None = None) -> None:
82 """Register an observer to receive events.
84 Args:
85 callback: Callable that receives raw event dicts.
86 filter: Optional glob pattern(s) to match against the event's ``"event"`` key.
87 A single string (e.g. ``"issue.*"``) or a list of strings
88 (e.g. ``["issue.*", "parallel.*"]``). ``None`` (default) means
89 the observer receives every event — preserving existing behaviour.
90 FSM executor events use bare names (``"state_enter"``, ``"loop_*"``);
91 other subsystems use dotted namespaces (``"issue.*"``, ``"parallel.*"``).
92 """
93 patterns: list[str] | None = None
94 if filter is not None:
95 patterns = [filter] if isinstance(filter, str) else list(filter)
96 self._observers.append((callback, patterns))
98 def unregister(self, callback: EventCallback) -> None:
99 """Remove an observer. No-op if not registered."""
100 for i, (cb, _) in enumerate(self._observers):
101 if cb is callback:
102 del self._observers[i]
103 return
105 def add_transport(self, transport: Transport) -> None:
106 """Register a Transport to receive every emitted event."""
107 self._transports.append(transport)
109 def close_transports(self) -> None:
110 """Call ``close()`` on every registered transport, isolating exceptions."""
111 for transport in self._transports:
112 try:
113 transport.close()
114 except Exception:
115 logger.warning("EventBus transport close raised an exception", exc_info=True)
117 def emit(self, event: dict[str, Any]) -> None:
118 """Dispatch event to all observers and transports.
120 Observer and transport exceptions are caught and logged to prevent one
121 sink from blocking others.
122 """
123 event_type = event.get("event", "")
124 for observer, filter_patterns in self._observers:
125 if filter_patterns is not None and not any(
126 fnmatch.fnmatch(event_type, p) for p in filter_patterns
127 ):
128 continue
129 try:
130 observer(event)
131 except Exception:
132 logger.warning("EventBus observer raised an exception", exc_info=True)
134 for transport in self._transports:
135 try:
136 transport.send(event)
137 except Exception:
138 logger.warning("EventBus transport raised an exception", exc_info=True)
140 @staticmethod
141 def read_events(path: Path) -> list[LLEvent]:
142 """Read events from a JSONL file.
144 Returns:
145 List of LLEvent instances, empty if file doesn't exist.
146 Malformed lines are silently skipped.
147 """
148 if not path.exists():
149 return []
150 events: list[LLEvent] = []
151 with open(path, encoding="utf-8") as f:
152 for line in f:
153 line = line.strip()
154 if line:
155 try:
156 events.append(LLEvent.from_dict(json.loads(line)))
157 except json.JSONDecodeError:
158 continue
159 return events