Coverage for agentos/comm/layer.py: 34%
121 statements
« prev ^ index » next coverage.py v7.14.3, created at 2026-07-02 09:59 +0800
« prev ^ index » next coverage.py v7.14.3, created at 2026-07-02 09:59 +0800
1"""
2Communication Layer for NexusAgent.
4Provides multiple communication patterns for multi-agent systems:
5- Blackboard: Shared memory space
6- EventBus: Publish-subscribe events
7- Mailbox: Direct point-to-point messaging
8"""
10from __future__ import annotations
12import asyncio
13import time
14import uuid
15from dataclasses import dataclass, field
16from typing import Any, Callable, Optional
19@dataclass
20class Message:
21 """
22 Communication message.
24 Attributes:
25 id: Unique identifier
26 sender: Sender name
27 receiver: Receiver name
28 content: Message content
29 metadata: Additional metadata
30 timestamp: Message timestamp
31 """
32 id: str = field(default_factory=lambda: uuid.uuid4().hex[:12])
33 sender: str = ""
34 receiver: Optional[str] = None
35 content: Any = None
36 metadata: dict[str, Any] = field(default_factory=dict)
37 timestamp: float = field(default_factory=time.time)
39 def to_dict(self) -> dict[str, Any]:
40 """Convert to dict."""
41 return {
42 "id": self.id,
43 "sender": self.sender,
44 "receiver": self.receiver,
45 "content": self.content,
46 "metadata": self.metadata,
47 "timestamp": self.timestamp,
48 }
51class Blackboard:
52 """
53 Shared memory space for agents.
55 Agents can read/write to a shared blackboard.
56 Useful for collaborative problem solving.
58 Usage:
59 blackboard = Blackboard()
60 blackboard.write("agent1", "status", "working")
61 status = blackboard.read("agent1", "status")
62 """
64 def __init__(self):
65 """Initialize blackboard."""
66 self._data: dict[str, dict[str, Any]] = {}
67 self._history: list[dict[str, Any]] = []
69 def write(
70 self,
71 agent_name: str,
72 key: str,
73 value: Any,
74 ) -> None:
75 """
76 Write to blackboard.
78 Args:
79 agent_name: Agent name
80 key: Data key
81 value: Data value
82 """
83 if agent_name not in self._data:
84 self._data[agent_name] = {}
86 self._data[agent_name][key] = value
88 # Record in history
89 self._history.append({
90 "agent": agent_name,
91 "key": key,
92 "value": value,
93 "timestamp": time.time(),
94 })
96 def read(
97 self,
98 agent_name: str,
99 key: str,
100 default: Any = None,
101 ) -> Any:
102 """
103 Read from blackboard.
105 Args:
106 agent_name: Agent name
107 key: Data key
108 default: Default value if not found
110 Returns:
111 Data value
112 """
113 if agent_name not in self._data:
114 return default
116 return self._data[agent_name].get(key, default)
118 def read_all(self, key: str) -> dict[str, Any]:
119 """
120 Read key from all agents.
122 Args:
123 key: Data key
125 Returns:
126 Dict of agent_name -> value
127 """
128 return {
129 agent: data.get(key)
130 for agent, data in self._data.items()
131 if key in data
132 }
134 def get_agent_data(self, agent_name: str) -> dict[str, Any]:
135 """
136 Get all data for an agent.
138 Args:
139 agent_name: Agent name
141 Returns:
142 Dict of key -> value
143 """
144 return self._data.get(agent_name, {}).copy()
146 def get_history(
147 self,
148 agent_name: Optional[str] = None,
149 limit: int = 100,
150 ) -> list[dict[str, Any]]:
151 """
152 Get write history.
154 Args:
155 agent_name: Filter by agent (None = all)
156 limit: Max results
158 Returns:
159 List of history entries
160 """
161 history = self._history
163 if agent_name:
164 history = [h for h in history if h["agent"] == agent_name]
166 return history[-limit:]
168 def clear(self, agent_name: Optional[str] = None) -> None:
169 """
170 Clear blackboard.
172 Args:
173 agent_name: Clear specific agent (None = all)
174 """
175 if agent_name:
176 self._data.pop(agent_name, None)
177 else:
178 self._data.clear()
181class EventBus:
182 """
183 Publish-subscribe event system.
185 Agents can subscribe to events and publish events.
186 Useful for event-driven architectures.
188 Usage:
189 bus = EventBus()
191 # Subscribe
192 bus.subscribe("task_completed", callback)
194 # Publish
195 bus.publish("task_completed", {"task_id": "123"})
196 """
198 def __init__(self):
199 """Initialize event bus."""
200 self._subscribers: dict[str, list[Callable[[Any], None]]] = {}
201 self._history: list[dict[str, Any]] = []
203 def subscribe(
204 self,
205 event_type: str,
206 callback: Callable[[Any], None],
207 ) -> None:
208 """
209 Subscribe to an event.
211 Args:
212 event_type: Event type
213 callback: Callback function
214 """
215 if event_type not in self._subscribers:
216 self._subscribers[event_type] = []
218 self._subscribers[event_type].append(callback)
220 def unsubscribe(
221 self,
222 event_type: str,
223 callback: Callable[[Any], None],
224 ) -> bool:
225 """
226 Unsubscribe from an event.
228 Args:
229 event_type: Event type
230 callback: Callback function
232 Returns:
233 True if unsubscribed, False if not found
234 """
235 if event_type not in self._subscribers:
236 return False
238 if callback in self._subscribers[event_type]:
239 self._subscribers[event_type].remove(callback)
240 return True
242 return False
244 def publish(
245 self,
246 event_type: str,
247 data: Any = None,
248 sender: str = "",
249 ) -> int:
250 """
251 Publish an event.
253 Args:
254 event_type: Event type
255 data: Event data
256 sender: Sender name
258 Returns:
259 Number of subscribers notified
260 """
261 # Record in history
262 self._history.append({
263 "event_type": event_type,
264 "data": data,
265 "sender": sender,
266 "timestamp": time.time(),
267 })
269 # Notify subscribers
270 subscribers = self._subscribers.get(event_type, [])
271 for callback in subscribers:
272 try:
273 callback(data)
274 except Exception:
275 pass # Don't let one callback break others
277 return len(subscribers)
279 async def publish_async(
280 self,
281 event_type: str,
282 data: Any = None,
283 sender: str = "",
284 ) -> int:
285 """
286 Publish an event asynchronously.
288 Args:
289 event_type: Event type
290 data: Event data
291 sender: Sender name
293 Returns:
294 Number of subscribers notified
295 """
296 # Record in history
297 self._history.append({
298 "event_type": event_type,
299 "data": data,
300 "sender": sender,
301 "timestamp": time.time(),
302 })
304 # Notify subscribers
305 subscribers = self._subscribers.get(event_type, [])
306 tasks = []
307 for callback in subscribers:
308 if asyncio.iscoroutinefunction(callback):
309 tasks.append(callback(data))
310 else:
311 callback(data)
313 if tasks:
314 await asyncio.gather(*tasks, return_exceptions=True)
316 return len(subscribers)
318 def get_history(
319 self,
320 event_type: Optional[str] = None,
321 limit: int = 100,
322 ) -> list[dict[str, Any]]:
323 """
324 Get event history.
326 Args:
327 event_type: Filter by event type (None = all)
328 limit: Max results
330 Returns:
331 List of history entries
332 """
333 history = self._history
335 if event_type:
336 history = [h for h in history if h["event_type"] == event_type]
338 return history[-limit:]
340 def clear(self) -> None:
341 """Clear event bus."""
342 self._subscribers.clear()
343 self._history.clear()
346class Mailbox:
347 """
348 Point-to-point messaging system.
350 Agents have mailboxes and can send/receive messages.
351 Useful for direct communication.
353 Usage:
354 mailbox = Mailbox()
355 mailbox.send("agent1", "agent2", "Hello")
356 messages = mailbox.receive("agent2")
357 """
359 def __init__(self):
360 """Initialize mailbox system."""
361 self._mailboxes: dict[str, list[Message]] = {}
362 self._sent: list[Message] = []
364 def send(
365 self,
366 sender: str,
367 receiver: str,
368 content: Any,
369 **metadata
370 ) -> Message:
371 """
372 Send a message.
374 Args:
375 sender: Sender name
376 receiver: Receiver name
377 content: Message content
378 **metadata: Additional metadata
380 Returns:
381 Created Message
382 """
383 message = Message(
384 sender=sender,
385 receiver=receiver,
386 content=content,
387 metadata=metadata,
388 )
390 # Add to receiver's mailbox
391 if receiver not in self._mailboxes:
392 self._mailboxes[receiver] = []
394 self._mailboxes[receiver].append(message)
395 self._sent.append(message)
397 return message
399 def receive(
400 self,
401 receiver: str,
402 limit: int = 100,
403 ) -> list[Message]:
404 """
405 Receive messages.
407 Args:
408 receiver: Receiver name
409 limit: Max messages
411 Returns:
412 List of messages
413 """
414 messages = self._mailboxes.get(receiver, [])
415 return messages[:limit]
417 def receive_and_clear(
418 self,
419 receiver: str,
420 limit: int = 100,
421 ) -> list[Message]:
422 """
423 Receive and clear messages.
425 Args:
426 receiver: Receiver name
427 limit: Max messages
429 Returns:
430 List of messages
431 """
432 messages = self._mailboxes.get(receiver, [])[:limit]
433 self._mailboxes[receiver] = self._mailboxes.get(receiver, [])[limit:]
434 return messages
436 def get_sent(
437 self,
438 sender: Optional[str] = None,
439 limit: int = 100,
440 ) -> list[Message]:
441 """
442 Get sent messages.
444 Args:
445 sender: Filter by sender (None = all)
446 limit: Max results
448 Returns:
449 List of messages
450 """
451 sent = self._sent
453 if sender:
454 sent = [m for m in sent if m.sender == sender]
456 return sent[-limit:]
458 def clear(self, receiver: Optional[str] = None) -> None:
459 """
460 Clear mailboxes.
462 Args:
463 receiver: Clear specific receiver (None = all)
464 """
465 if receiver:
466 self._mailboxes.pop(receiver, None)
467 else:
468 self._mailboxes.clear()
471class CommunicationLayer:
472 """
473 Unified communication layer.
475 Combines Blackboard, EventBus, and Mailbox into
476 a single interface.
478 Usage:
479 comm = CommunicationLayer()
481 # Use blackboard
482 comm.blackboard.write("agent1", "status", "working")
484 # Use event bus
485 comm.event_bus.subscribe("task_completed", callback)
487 # Use mailbox
488 comm.mailbox.send("agent1", "agent2", "Hello")
489 """
491 def __init__(self):
492 """Initialize communication layer."""
493 self.blackboard = Blackboard()
494 self.event_bus = EventBus()
495 self.mailbox = Mailbox()
497 def clear(self) -> None:
498 """Clear all communication channels."""
499 self.blackboard.clear()
500 self.event_bus.clear()
501 self.mailbox.clear()