Coverage for agentos/tools/event_bus.py: 91%

111 statements  

« prev     ^ index     » next       coverage.py v7.14.3, created at 2026-07-03 06:58 +0800

1""" 

2Event Bus / Pub-Sub for AgentOS. 

3 

4EventBus — in-process pub/sub with topic wildcards, async dispatch, and replay. 

5supports: exact topic match, `*` single-level, `**` multi-level wildcards. 

6""" 

7 

8import fnmatch 

9import threading 

10import time 

11from collections import defaultdict 

12from dataclasses import dataclass, field 

13from typing import Any, Callable, Dict, List, Optional, Set 

14 

15 

16# ============================================================================ 

17# Event 

18# ============================================================================ 

19 

20@dataclass 

21class Event: 

22 """A published event with topic, payload, and metadata.""" 

23 topic: str 

24 data: Any = None 

25 timestamp: float = field(default_factory=time.time) 

26 source: str = "" 

27 

28 

29Subscriber = Callable[[Event], None] 

30UnsubscribeHandle = Callable[[], None] 

31 

32 

33# ============================================================================ 

34# EventBus 

35# ============================================================================ 

36 

37class EventBus: 

38 """Thread-safe in-process pub/sub event bus. 

39 

40 Topics use dot-separated paths: 'agent.tool.call', 'system.shutdown'. 

41 Wildcards: 'agent.*.call' (single level), 'agent.**' (multi-level). 

42 """ 

43 

44 def __init__(self): 

45 self._subscribers: Dict[str, List[Subscriber]] = defaultdict(list) 

46 self._lock = threading.RLock() 

47 self._history: List[Event] = [] 

48 self._max_history: int = 1000 

49 self._total_published: int = 0 

50 self._total_delivered: int = 0 

51 self._running: bool = True 

52 

53 def subscribe(self, topic: str, callback: Subscriber) -> UnsubscribeHandle: 

54 """Subscribe to a topic (supports wildcards). Returns unsubscribe handle.""" 

55 with self._lock: 

56 self._subscribers[topic].append(callback) 

57 # Return a closure that removes this specific callback 

58 sub_list = self._subscribers[topic] 

59 

60 def unsubscribe(): 

61 with self._lock: 

62 if callback in sub_list: 

63 sub_list.remove(callback) 

64 

65 return unsubscribe 

66 

67 def unsubscribe(self, topic: str, callback: Subscriber) -> bool: 

68 """Remove a specific subscriber. Returns True if found and removed.""" 

69 with self._lock: 

70 subs = self._subscribers.get(topic) 

71 if subs and callback in subs: 

72 subs.remove(callback) 

73 return True 

74 return False 

75 

76 def publish(self, topic: str, data: Any = None, source: str = "") -> None: 

77 """Publish an event to all matching subscribers synchronously.""" 

78 event = Event(topic=topic, data=data, source=source) 

79 with self._lock: 

80 self._history.append(event) 

81 if len(self._history) > self._max_history: 

82 self._history = self._history[-self._max_history:] 

83 self._total_published += 1 

84 

85 if not self._running: 

86 return 

87 

88 for sub_topic, callbacks in list(self._subscribers.items()): 

89 if self._topic_match(sub_topic, topic): 

90 for cb in callbacks[:]: # copy for safe iteration 

91 try: 

92 cb(event) 

93 self._total_delivered += 1 

94 except Exception: 

95 pass 

96 

97 def _topic_match(self, pattern: str, topic: str) -> bool: 

98 """Match a topic against a pattern with wildcard support. 

99 - `*` matches a single level (dot-delimited). 

100 - `**` matches zero or more levels. 

101 """ 

102 if '**' in pattern or '*' in pattern: 

103 return self._wildcard_match(pattern, topic) 

104 return pattern == topic 

105 

106 def _wildcard_match(self, pattern: str, topic: str) -> bool: 

107 """fnmatch-style glob matching on dot-delimited topic paths.""" 

108 return fnmatch.fnmatch(topic, pattern) 

109 

110 def get_history(self, limit: int = 100) -> List[Event]: 

111 """Get recent published events.""" 

112 with self._lock: 

113 return list(self._history[-limit:]) 

114 

115 def clear_history(self) -> None: 

116 with self._lock: 

117 self._history.clear() 

118 

119 def stop(self) -> None: 

120 """Stop delivering events (still records history).""" 

121 with self._lock: 

122 self._running = False 

123 

124 def start(self) -> None: 

125 with self._lock: 

126 self._running = True 

127 

128 def subscriber_count(self, topic: Optional[str] = None) -> int: 

129 """Count subscribers. If topic given, counts for that pattern only.""" 

130 with self._lock: 

131 if topic: 

132 return len(self._subscribers.get(topic, [])) 

133 return sum(len(v) for v in self._subscribers.values()) 

134 

135 @property 

136 def stats(self) -> Dict[str, Any]: 

137 with self._lock: 

138 return { 

139 "total_published": self._total_published, 

140 "total_delivered": self._total_delivered, 

141 "subscriber_count": self.subscriber_count(), 

142 "history_size": len(self._history), 

143 "topics": list(self._subscribers.keys()), 

144 } 

145 

146 

147# ============================================================================ 

148# TopicFilter 

149# ============================================================================ 

150 

151class TopicFilter: 

152 """Pre-compiled topic filter chain for high-throughput event routing.""" 

153 

154 def __init__(self): 

155 self._filters: Dict[str, Callable[[Event], bool]] = {} 

156 self._lock = threading.Lock() 

157 

158 def add(self, name: str, predicate: Callable[[Event], bool]) -> None: 

159 with self._lock: 

160 self._filters[name] = predicate 

161 

162 def remove(self, name: str) -> bool: 

163 with self._lock: 

164 return self._filters.pop(name, None) is not None 

165 

166 def evaluate(self, event: Event) -> List[str]: 

167 """Return names of all matching filters for this event.""" 

168 matches = [] 

169 with self._lock: 

170 for name, pred in self._filters.items(): 

171 try: 

172 if pred(event): 

173 matches.append(name) 

174 except Exception: 

175 pass 

176 return matches 

177 

178 

179# ============================================================================ 

180# Global singleton 

181# ============================================================================ 

182 

183_default_bus: Optional[EventBus] = None 

184_default_lock = threading.Lock() 

185 

186 

187def get_event_bus() -> EventBus: 

188 """Get or create the global default EventBus.""" 

189 global _default_bus 

190 if _default_bus is None: 

191 with _default_lock: 

192 if _default_bus is None: 

193 _default_bus = EventBus() 

194 return _default_bus