Coverage for agentos/comm/layer.py: 34%

121 statements  

« prev     ^ index     » next       coverage.py v7.14.3, created at 2026-07-02 09:59 +0800

1""" 

2Communication Layer for NexusAgent. 

3 

4Provides multiple communication patterns for multi-agent systems: 

5- Blackboard: Shared memory space 

6- EventBus: Publish-subscribe events 

7- Mailbox: Direct point-to-point messaging 

8""" 

9 

10from __future__ import annotations 

11 

12import asyncio 

13import time 

14import uuid 

15from dataclasses import dataclass, field 

16from typing import Any, Callable, Optional 

17 

18 

19@dataclass 

20class Message: 

21 """ 

22 Communication message. 

23 

24 Attributes: 

25 id: Unique identifier 

26 sender: Sender name 

27 receiver: Receiver name 

28 content: Message content 

29 metadata: Additional metadata 

30 timestamp: Message timestamp 

31 """ 

32 id: str = field(default_factory=lambda: uuid.uuid4().hex[:12]) 

33 sender: str = "" 

34 receiver: Optional[str] = None 

35 content: Any = None 

36 metadata: dict[str, Any] = field(default_factory=dict) 

37 timestamp: float = field(default_factory=time.time) 

38 

39 def to_dict(self) -> dict[str, Any]: 

40 """Convert to dict.""" 

41 return { 

42 "id": self.id, 

43 "sender": self.sender, 

44 "receiver": self.receiver, 

45 "content": self.content, 

46 "metadata": self.metadata, 

47 "timestamp": self.timestamp, 

48 } 

49 

50 

51class Blackboard: 

52 """ 

53 Shared memory space for agents. 

54 

55 Agents can read/write to a shared blackboard. 

56 Useful for collaborative problem solving. 

57 

58 Usage: 

59 blackboard = Blackboard() 

60 blackboard.write("agent1", "status", "working") 

61 status = blackboard.read("agent1", "status") 

62 """ 

63 

64 def __init__(self): 

65 """Initialize blackboard.""" 

66 self._data: dict[str, dict[str, Any]] = {} 

67 self._history: list[dict[str, Any]] = [] 

68 

69 def write( 

70 self, 

71 agent_name: str, 

72 key: str, 

73 value: Any, 

74 ) -> None: 

75 """ 

76 Write to blackboard. 

77 

78 Args: 

79 agent_name: Agent name 

80 key: Data key 

81 value: Data value 

82 """ 

83 if agent_name not in self._data: 

84 self._data[agent_name] = {} 

85 

86 self._data[agent_name][key] = value 

87 

88 # Record in history 

89 self._history.append({ 

90 "agent": agent_name, 

91 "key": key, 

92 "value": value, 

93 "timestamp": time.time(), 

94 }) 

95 

96 def read( 

97 self, 

98 agent_name: str, 

99 key: str, 

100 default: Any = None, 

101 ) -> Any: 

102 """ 

103 Read from blackboard. 

104 

105 Args: 

106 agent_name: Agent name 

107 key: Data key 

108 default: Default value if not found 

109 

110 Returns: 

111 Data value 

112 """ 

113 if agent_name not in self._data: 

114 return default 

115 

116 return self._data[agent_name].get(key, default) 

117 

118 def read_all(self, key: str) -> dict[str, Any]: 

119 """ 

120 Read key from all agents. 

121 

122 Args: 

123 key: Data key 

124 

125 Returns: 

126 Dict of agent_name -> value 

127 """ 

128 return { 

129 agent: data.get(key) 

130 for agent, data in self._data.items() 

131 if key in data 

132 } 

133 

134 def get_agent_data(self, agent_name: str) -> dict[str, Any]: 

135 """ 

136 Get all data for an agent. 

137 

138 Args: 

139 agent_name: Agent name 

140 

141 Returns: 

142 Dict of key -> value 

143 """ 

144 return self._data.get(agent_name, {}).copy() 

145 

146 def get_history( 

147 self, 

148 agent_name: Optional[str] = None, 

149 limit: int = 100, 

150 ) -> list[dict[str, Any]]: 

151 """ 

152 Get write history. 

153 

154 Args: 

155 agent_name: Filter by agent (None = all) 

156 limit: Max results 

157 

158 Returns: 

159 List of history entries 

160 """ 

161 history = self._history 

162 

163 if agent_name: 

164 history = [h for h in history if h["agent"] == agent_name] 

165 

166 return history[-limit:] 

167 

168 def clear(self, agent_name: Optional[str] = None) -> None: 

169 """ 

170 Clear blackboard. 

171 

172 Args: 

173 agent_name: Clear specific agent (None = all) 

174 """ 

175 if agent_name: 

176 self._data.pop(agent_name, None) 

177 else: 

178 self._data.clear() 

179 

180 

181class EventBus: 

182 """ 

183 Publish-subscribe event system. 

184 

185 Agents can subscribe to events and publish events. 

186 Useful for event-driven architectures. 

187 

188 Usage: 

189 bus = EventBus() 

190 

191 # Subscribe 

192 bus.subscribe("task_completed", callback) 

193 

194 # Publish 

195 bus.publish("task_completed", {"task_id": "123"}) 

196 """ 

197 

198 def __init__(self): 

199 """Initialize event bus.""" 

200 self._subscribers: dict[str, list[Callable[[Any], None]]] = {} 

201 self._history: list[dict[str, Any]] = [] 

202 

203 def subscribe( 

204 self, 

205 event_type: str, 

206 callback: Callable[[Any], None], 

207 ) -> None: 

208 """ 

209 Subscribe to an event. 

210 

211 Args: 

212 event_type: Event type 

213 callback: Callback function 

214 """ 

215 if event_type not in self._subscribers: 

216 self._subscribers[event_type] = [] 

217 

218 self._subscribers[event_type].append(callback) 

219 

220 def unsubscribe( 

221 self, 

222 event_type: str, 

223 callback: Callable[[Any], None], 

224 ) -> bool: 

225 """ 

226 Unsubscribe from an event. 

227 

228 Args: 

229 event_type: Event type 

230 callback: Callback function 

231 

232 Returns: 

233 True if unsubscribed, False if not found 

234 """ 

235 if event_type not in self._subscribers: 

236 return False 

237 

238 if callback in self._subscribers[event_type]: 

239 self._subscribers[event_type].remove(callback) 

240 return True 

241 

242 return False 

243 

244 def publish( 

245 self, 

246 event_type: str, 

247 data: Any = None, 

248 sender: str = "", 

249 ) -> int: 

250 """ 

251 Publish an event. 

252 

253 Args: 

254 event_type: Event type 

255 data: Event data 

256 sender: Sender name 

257 

258 Returns: 

259 Number of subscribers notified 

260 """ 

261 # Record in history 

262 self._history.append({ 

263 "event_type": event_type, 

264 "data": data, 

265 "sender": sender, 

266 "timestamp": time.time(), 

267 }) 

268 

269 # Notify subscribers 

270 subscribers = self._subscribers.get(event_type, []) 

271 for callback in subscribers: 

272 try: 

273 callback(data) 

274 except Exception: 

275 pass # Don't let one callback break others 

276 

277 return len(subscribers) 

278 

279 async def publish_async( 

280 self, 

281 event_type: str, 

282 data: Any = None, 

283 sender: str = "", 

284 ) -> int: 

285 """ 

286 Publish an event asynchronously. 

287 

288 Args: 

289 event_type: Event type 

290 data: Event data 

291 sender: Sender name 

292 

293 Returns: 

294 Number of subscribers notified 

295 """ 

296 # Record in history 

297 self._history.append({ 

298 "event_type": event_type, 

299 "data": data, 

300 "sender": sender, 

301 "timestamp": time.time(), 

302 }) 

303 

304 # Notify subscribers 

305 subscribers = self._subscribers.get(event_type, []) 

306 tasks = [] 

307 for callback in subscribers: 

308 if asyncio.iscoroutinefunction(callback): 

309 tasks.append(callback(data)) 

310 else: 

311 callback(data) 

312 

313 if tasks: 

314 await asyncio.gather(*tasks, return_exceptions=True) 

315 

316 return len(subscribers) 

317 

318 def get_history( 

319 self, 

320 event_type: Optional[str] = None, 

321 limit: int = 100, 

322 ) -> list[dict[str, Any]]: 

323 """ 

324 Get event history. 

325 

326 Args: 

327 event_type: Filter by event type (None = all) 

328 limit: Max results 

329 

330 Returns: 

331 List of history entries 

332 """ 

333 history = self._history 

334 

335 if event_type: 

336 history = [h for h in history if h["event_type"] == event_type] 

337 

338 return history[-limit:] 

339 

340 def clear(self) -> None: 

341 """Clear event bus.""" 

342 self._subscribers.clear() 

343 self._history.clear() 

344 

345 

346class Mailbox: 

347 """ 

348 Point-to-point messaging system. 

349 

350 Agents have mailboxes and can send/receive messages. 

351 Useful for direct communication. 

352 

353 Usage: 

354 mailbox = Mailbox() 

355 mailbox.send("agent1", "agent2", "Hello") 

356 messages = mailbox.receive("agent2") 

357 """ 

358 

359 def __init__(self): 

360 """Initialize mailbox system.""" 

361 self._mailboxes: dict[str, list[Message]] = {} 

362 self._sent: list[Message] = [] 

363 

364 def send( 

365 self, 

366 sender: str, 

367 receiver: str, 

368 content: Any, 

369 **metadata 

370 ) -> Message: 

371 """ 

372 Send a message. 

373 

374 Args: 

375 sender: Sender name 

376 receiver: Receiver name 

377 content: Message content 

378 **metadata: Additional metadata 

379 

380 Returns: 

381 Created Message 

382 """ 

383 message = Message( 

384 sender=sender, 

385 receiver=receiver, 

386 content=content, 

387 metadata=metadata, 

388 ) 

389 

390 # Add to receiver's mailbox 

391 if receiver not in self._mailboxes: 

392 self._mailboxes[receiver] = [] 

393 

394 self._mailboxes[receiver].append(message) 

395 self._sent.append(message) 

396 

397 return message 

398 

399 def receive( 

400 self, 

401 receiver: str, 

402 limit: int = 100, 

403 ) -> list[Message]: 

404 """ 

405 Receive messages. 

406 

407 Args: 

408 receiver: Receiver name 

409 limit: Max messages 

410 

411 Returns: 

412 List of messages 

413 """ 

414 messages = self._mailboxes.get(receiver, []) 

415 return messages[:limit] 

416 

417 def receive_and_clear( 

418 self, 

419 receiver: str, 

420 limit: int = 100, 

421 ) -> list[Message]: 

422 """ 

423 Receive and clear messages. 

424 

425 Args: 

426 receiver: Receiver name 

427 limit: Max messages 

428 

429 Returns: 

430 List of messages 

431 """ 

432 messages = self._mailboxes.get(receiver, [])[:limit] 

433 self._mailboxes[receiver] = self._mailboxes.get(receiver, [])[limit:] 

434 return messages 

435 

436 def get_sent( 

437 self, 

438 sender: Optional[str] = None, 

439 limit: int = 100, 

440 ) -> list[Message]: 

441 """ 

442 Get sent messages. 

443 

444 Args: 

445 sender: Filter by sender (None = all) 

446 limit: Max results 

447 

448 Returns: 

449 List of messages 

450 """ 

451 sent = self._sent 

452 

453 if sender: 

454 sent = [m for m in sent if m.sender == sender] 

455 

456 return sent[-limit:] 

457 

458 def clear(self, receiver: Optional[str] = None) -> None: 

459 """ 

460 Clear mailboxes. 

461 

462 Args: 

463 receiver: Clear specific receiver (None = all) 

464 """ 

465 if receiver: 

466 self._mailboxes.pop(receiver, None) 

467 else: 

468 self._mailboxes.clear() 

469 

470 

471class CommunicationLayer: 

472 """ 

473 Unified communication layer. 

474 

475 Combines Blackboard, EventBus, and Mailbox into 

476 a single interface. 

477 

478 Usage: 

479 comm = CommunicationLayer() 

480 

481 # Use blackboard 

482 comm.blackboard.write("agent1", "status", "working") 

483 

484 # Use event bus 

485 comm.event_bus.subscribe("task_completed", callback) 

486 

487 # Use mailbox 

488 comm.mailbox.send("agent1", "agent2", "Hello") 

489 """ 

490 

491 def __init__(self): 

492 """Initialize communication layer.""" 

493 self.blackboard = Blackboard() 

494 self.event_bus = EventBus() 

495 self.mailbox = Mailbox() 

496 

497 def clear(self) -> None: 

498 """Clear all communication channels.""" 

499 self.blackboard.clear() 

500 self.event_bus.clear() 

501 self.mailbox.clear()