Coverage for little_loops / events.py: 97%

76 statements  

« prev     ^ index     » next       coverage.py v7.12.0, created at 2026-05-22 16:19 -0500

1"""Event system for little-loops extension architecture. 

2 

3Provides structured event types and a multi-observer event bus that extensions 

4can subscribe to for receiving lifecycle events from FSM loops, issue management, 

5and automation workflows. 

6 

7Public exports: 

8 LLEvent: Structured event dataclass with type, timestamp, and payload 

9 EventBus: Multi-observer event dispatcher with pluggable transports 

10""" 

11 

12from __future__ import annotations 

13 

14import fnmatch 

15import json 

16import logging 

17from collections.abc import Callable 

18from dataclasses import dataclass, field 

19from pathlib import Path 

20from typing import TYPE_CHECKING, Any 

21 

22if TYPE_CHECKING: 

23 from little_loops.transport import Transport 

24 

25logger = logging.getLogger(__name__) 

26 

27# Type for event callback (matches existing EventCallback in fsm/executor.py) 

28EventCallback = Callable[[dict[str, Any]], None] 

29 

30 

31@dataclass 

32class LLEvent: 

33 """Structured event emitted by little-loops subsystems. 

34 

35 Attributes: 

36 type: Event type identifier (e.g. "fsm.state_enter", "fsm.loop_complete") 

37 timestamp: ISO 8601 timestamp string 

38 payload: Type-specific event data 

39 """ 

40 

41 type: str 

42 timestamp: str 

43 payload: dict[str, Any] = field(default_factory=dict) 

44 

45 def to_dict(self) -> dict[str, Any]: 

46 """Convert to dictionary for JSON serialization. 

47 

48 Produces a flat dict compatible with the existing FSM event format: 

49 {"event": type, "ts": timestamp, ...payload} 

50 """ 

51 return {"event": self.type, "ts": self.timestamp, **self.payload} 

52 

53 @classmethod 

54 def from_dict(cls, data: dict[str, Any]) -> LLEvent: 

55 """Create from dictionary (JSON deserialization). 

56 

57 Accepts the flat dict format: {"event": ..., "ts": ..., ...payload} 

58 """ 

59 copy = dict(data) 

60 event_type = copy.pop("event", copy.pop("type", "unknown")) 

61 ts = copy.pop("ts", copy.pop("timestamp", "")) 

62 return cls(type=event_type, timestamp=ts, payload=copy) 

63 

64 @classmethod 

65 def from_raw_event(cls, raw: dict[str, Any]) -> LLEvent: 

66 """Convert existing executor event dict to LLEvent without mutating the original.""" 

67 return cls.from_dict(dict(raw)) 

68 

69 

70class EventBus: 

71 """Multi-observer event dispatcher. 

72 

73 Dispatches event dicts to all registered observers. Exceptions in individual 

74 observers are caught and logged, not propagated. 

75 """ 

76 

77 def __init__(self) -> None: 

78 self._observers: list[tuple[EventCallback, list[str] | None]] = [] 

79 self._transports: list[Transport] = [] 

80 

81 def register(self, callback: EventCallback, filter: str | list[str] | None = None) -> None: 

82 """Register an observer to receive events. 

83 

84 Args: 

85 callback: Callable that receives raw event dicts. 

86 filter: Optional glob pattern(s) to match against the event's ``"event"`` key. 

87 A single string (e.g. ``"issue.*"``) or a list of strings 

88 (e.g. ``["issue.*", "parallel.*"]``). ``None`` (default) means 

89 the observer receives every event — preserving existing behaviour. 

90 FSM executor events use bare names (``"state_enter"``, ``"loop_*"``); 

91 other subsystems use dotted namespaces (``"issue.*"``, ``"parallel.*"``). 

92 """ 

93 patterns: list[str] | None = None 

94 if filter is not None: 

95 patterns = [filter] if isinstance(filter, str) else list(filter) 

96 self._observers.append((callback, patterns)) 

97 

98 def unregister(self, callback: EventCallback) -> None: 

99 """Remove an observer. No-op if not registered.""" 

100 for i, (cb, _) in enumerate(self._observers): 

101 if cb is callback: 

102 del self._observers[i] 

103 return 

104 

105 def add_transport(self, transport: Transport) -> None: 

106 """Register a Transport to receive every emitted event.""" 

107 self._transports.append(transport) 

108 

109 def close_transports(self) -> None: 

110 """Call ``close()`` on every registered transport, isolating exceptions.""" 

111 for transport in self._transports: 

112 try: 

113 transport.close() 

114 except Exception: 

115 logger.warning("EventBus transport close raised an exception", exc_info=True) 

116 

117 def emit(self, event: dict[str, Any]) -> None: 

118 """Dispatch event to all observers and transports. 

119 

120 Observer and transport exceptions are caught and logged to prevent one 

121 sink from blocking others. 

122 """ 

123 event_type = event.get("event", "") 

124 for observer, filter_patterns in self._observers: 

125 if filter_patterns is not None and not any( 

126 fnmatch.fnmatch(event_type, p) for p in filter_patterns 

127 ): 

128 continue 

129 try: 

130 observer(event) 

131 except Exception: 

132 logger.warning("EventBus observer raised an exception", exc_info=True) 

133 

134 for transport in self._transports: 

135 try: 

136 transport.send(event) 

137 except Exception: 

138 logger.warning("EventBus transport raised an exception", exc_info=True) 

139 

140 @staticmethod 

141 def read_events(path: Path) -> list[LLEvent]: 

142 """Read events from a JSONL file. 

143 

144 Returns: 

145 List of LLEvent instances, empty if file doesn't exist. 

146 Malformed lines are silently skipped. 

147 """ 

148 if not path.exists(): 

149 return [] 

150 events: list[LLEvent] = [] 

151 with open(path, encoding="utf-8") as f: 

152 for line in f: 

153 line = line.strip() 

154 if line: 

155 try: 

156 events.append(LLEvent.from_dict(json.loads(line))) 

157 except json.JSONDecodeError: 

158 continue 

159 return events