Coverage for agentos/tools/event_bus.py: 91%
111 statements
« prev ^ index » next coverage.py v7.14.3, created at 2026-07-03 06:58 +0800
« prev ^ index » next coverage.py v7.14.3, created at 2026-07-03 06:58 +0800
1"""
2Event Bus / Pub-Sub for AgentOS.
4EventBus — in-process pub/sub with topic wildcards, async dispatch, and replay.
5supports: exact topic match, `*` single-level, `**` multi-level wildcards.
6"""
8import fnmatch
9import threading
10import time
11from collections import defaultdict
12from dataclasses import dataclass, field
13from typing import Any, Callable, Dict, List, Optional, Set
16# ============================================================================
17# Event
18# ============================================================================
20@dataclass
21class Event:
22 """A published event with topic, payload, and metadata."""
23 topic: str
24 data: Any = None
25 timestamp: float = field(default_factory=time.time)
26 source: str = ""
29Subscriber = Callable[[Event], None]
30UnsubscribeHandle = Callable[[], None]
33# ============================================================================
34# EventBus
35# ============================================================================
37class EventBus:
38 """Thread-safe in-process pub/sub event bus.
40 Topics use dot-separated paths: 'agent.tool.call', 'system.shutdown'.
41 Wildcards: 'agent.*.call' (single level), 'agent.**' (multi-level).
42 """
44 def __init__(self):
45 self._subscribers: Dict[str, List[Subscriber]] = defaultdict(list)
46 self._lock = threading.RLock()
47 self._history: List[Event] = []
48 self._max_history: int = 1000
49 self._total_published: int = 0
50 self._total_delivered: int = 0
51 self._running: bool = True
53 def subscribe(self, topic: str, callback: Subscriber) -> UnsubscribeHandle:
54 """Subscribe to a topic (supports wildcards). Returns unsubscribe handle."""
55 with self._lock:
56 self._subscribers[topic].append(callback)
57 # Return a closure that removes this specific callback
58 sub_list = self._subscribers[topic]
60 def unsubscribe():
61 with self._lock:
62 if callback in sub_list:
63 sub_list.remove(callback)
65 return unsubscribe
67 def unsubscribe(self, topic: str, callback: Subscriber) -> bool:
68 """Remove a specific subscriber. Returns True if found and removed."""
69 with self._lock:
70 subs = self._subscribers.get(topic)
71 if subs and callback in subs:
72 subs.remove(callback)
73 return True
74 return False
76 def publish(self, topic: str, data: Any = None, source: str = "") -> None:
77 """Publish an event to all matching subscribers synchronously."""
78 event = Event(topic=topic, data=data, source=source)
79 with self._lock:
80 self._history.append(event)
81 if len(self._history) > self._max_history:
82 self._history = self._history[-self._max_history:]
83 self._total_published += 1
85 if not self._running:
86 return
88 for sub_topic, callbacks in list(self._subscribers.items()):
89 if self._topic_match(sub_topic, topic):
90 for cb in callbacks[:]: # copy for safe iteration
91 try:
92 cb(event)
93 self._total_delivered += 1
94 except Exception:
95 pass
97 def _topic_match(self, pattern: str, topic: str) -> bool:
98 """Match a topic against a pattern with wildcard support.
99 - `*` matches a single level (dot-delimited).
100 - `**` matches zero or more levels.
101 """
102 if '**' in pattern or '*' in pattern:
103 return self._wildcard_match(pattern, topic)
104 return pattern == topic
106 def _wildcard_match(self, pattern: str, topic: str) -> bool:
107 """fnmatch-style glob matching on dot-delimited topic paths."""
108 return fnmatch.fnmatch(topic, pattern)
110 def get_history(self, limit: int = 100) -> List[Event]:
111 """Get recent published events."""
112 with self._lock:
113 return list(self._history[-limit:])
115 def clear_history(self) -> None:
116 with self._lock:
117 self._history.clear()
119 def stop(self) -> None:
120 """Stop delivering events (still records history)."""
121 with self._lock:
122 self._running = False
124 def start(self) -> None:
125 with self._lock:
126 self._running = True
128 def subscriber_count(self, topic: Optional[str] = None) -> int:
129 """Count subscribers. If topic given, counts for that pattern only."""
130 with self._lock:
131 if topic:
132 return len(self._subscribers.get(topic, []))
133 return sum(len(v) for v in self._subscribers.values())
135 @property
136 def stats(self) -> Dict[str, Any]:
137 with self._lock:
138 return {
139 "total_published": self._total_published,
140 "total_delivered": self._total_delivered,
141 "subscriber_count": self.subscriber_count(),
142 "history_size": len(self._history),
143 "topics": list(self._subscribers.keys()),
144 }
147# ============================================================================
148# TopicFilter
149# ============================================================================
151class TopicFilter:
152 """Pre-compiled topic filter chain for high-throughput event routing."""
154 def __init__(self):
155 self._filters: Dict[str, Callable[[Event], bool]] = {}
156 self._lock = threading.Lock()
158 def add(self, name: str, predicate: Callable[[Event], bool]) -> None:
159 with self._lock:
160 self._filters[name] = predicate
162 def remove(self, name: str) -> bool:
163 with self._lock:
164 return self._filters.pop(name, None) is not None
166 def evaluate(self, event: Event) -> List[str]:
167 """Return names of all matching filters for this event."""
168 matches = []
169 with self._lock:
170 for name, pred in self._filters.items():
171 try:
172 if pred(event):
173 matches.append(name)
174 except Exception:
175 pass
176 return matches
179# ============================================================================
180# Global singleton
181# ============================================================================
183_default_bus: Optional[EventBus] = None
184_default_lock = threading.Lock()
187def get_event_bus() -> EventBus:
188 """Get or create the global default EventBus."""
189 global _default_bus
190 if _default_bus is None:
191 with _default_lock:
192 if _default_bus is None:
193 _default_bus = EventBus()
194 return _default_bus