Coverage for agentos/orchestration/swarm_coordinator.py: 30%

402 statements  

« prev     ^ index     » next       coverage.py v7.14.3, created at 2026-07-02 09:59 +0800

1""" 

2Swarm Coordinator v2 (v1.9.0) 

3 

4Production-grade multi-agent swarm orchestration. Turns individual agents 

5into a coordinated team with dynamic task allocation, inter-agent messaging, 

6and conflict resolution. 

7 

8New in v2: 

9 - Dynamic task allocation: workload-aware agent assignment 

10 - Inter-agent message bus: pub/sub, broadcasts, directed messages 

11 - Conflict resolver: detect and resolve agent disagreements 

12 - Consensus protocols: majority vote, ranked choice, weighted voting 

13 - Topology manager: star, mesh, ring, tree, DAG topologies 

14 - Health monitor: heartbeat, dead agent detection, auto-recovery 

15 - Swarm state snapshot: checkpoint/resume for entire swarms 

16""" 

17 

18from __future__ import annotations 

19 

20import asyncio 

21import json 

22import time 

23import uuid 

24from collections import defaultdict 

25from dataclasses import dataclass, field 

26from enum import Enum 

27from pathlib import Path 

28from typing import Any, Optional, Callable, Awaitable 

29 

30 

31# ── Types ─────────────────────────────────────────────────────────── 

32 

33class AgentRole(str, Enum): 

34 COORDINATOR = "coordinator" # Orchestrates the swarm 

35 WORKER = "worker" # Executes tasks 

36 REVIEWER = "reviewer" # Reviews outputs 

37 OBSERVER = "observer" # Monitors only 

38 SPECIALIST = "specialist" # Domain expert (code, data, etc.) 

39 

40 

41class TaskPriority(str, Enum): 

42 CRITICAL = "critical" # Must complete, blocks everything 

43 HIGH = "high" 

44 MEDIUM = "medium" 

45 LOW = "low" 

46 

47 

48class TaskStatus(str, Enum): 

49 PENDING = "pending" 

50 ASSIGNED = "assigned" 

51 RUNNING = "running" 

52 COMPLETED = "completed" 

53 FAILED = "failed" 

54 CANCELED = "canceled" 

55 

56 

57class SwarmTopology(str, Enum): 

58 STAR = "star" # One coordinator, all others are workers 

59 MESH = "mesh" # Every agent can talk to every agent 

60 RING = "ring" # Circular communication 

61 TREE = "tree" # Hierarchical 

62 DAG = "dag" # Workflow-based dependencies 

63 HYBRID = "hybrid" # Dynamic topology switching 

64 

65 

66@dataclass 

67class AgentInfo: 

68 """Metadata about a swarm agent.""" 

69 agent_id: str 

70 role: AgentRole 

71 capabilities: list[str] = field(default_factory=list) 

72 model: str = "" 

73 max_concurrency: int = 3 

74 current_load: int = 0 

75 is_alive: bool = True 

76 last_heartbeat: float = 0.0 

77 metadata: dict[str, Any] = field(default_factory=dict) 

78 

79 @property 

80 def is_available(self) -> bool: 

81 return self.is_alive and self.current_load < self.max_concurrency 

82 

83 

84@dataclass 

85class SwarmTask: 

86 """A task to be executed by the swarm.""" 

87 task_id: str 

88 description: str 

89 priority: TaskPriority = TaskPriority.MEDIUM 

90 status: TaskStatus = TaskStatus.PENDING 

91 assigned_to: str = "" # Agent ID 

92 required_capabilities: list[str] = field(default_factory=list) 

93 parent_task_id: str = "" # DAG dependency 

94 dependencies: list[str] = field(default_factory=list) # Task IDs that must complete first 

95 result: Any = None 

96 error: str = "" 

97 started_at: float = 0.0 

98 completed_at: float = 0.0 

99 retry_count: int = 0 

100 max_retries: int = 3 

101 metadata: dict[str, Any] = field(default_factory=dict) 

102 

103 @property 

104 def is_ready(self) -> bool: 

105 return self.status == TaskStatus.PENDING and not self.dependencies 

106 

107 @property 

108 def duration_ms(self) -> float: 

109 if self.completed_at and self.started_at: 

110 return (self.completed_at - self.started_at) * 1000 

111 return 0.0 

112 

113 

114# ── Inter-Agent Message Bus ───────────────────────────────────────── 

115 

116@dataclass 

117class SwarmMessage: 

118 """A message sent between agents in the swarm.""" 

119 message_id: str 

120 from_agent: str 

121 to_agent: str = "" # Empty = broadcast 

122 topic: str = "" 

123 payload: Any = None 

124 timestamp: float = 0.0 

125 reply_to: str = "" 

126 metadata: dict[str, Any] = field(default_factory=dict) 

127 

128 

129class MessageBus: 

130 """Inter-agent pub/sub message bus. 

131 

132 Supports: 

133 - Point-to-point: send to specific agent 

134 - Broadcast: send to all agents 

135 - Topic-based: subscribe to channels 

136 - Request-reply: async request with reply correlation 

137 """ 

138 

139 def __init__(self): 

140 self._subscribers: dict[str, list[Callable]] = defaultdict(list) 

141 self._message_log: list[SwarmMessage] = [] 

142 self._pending_replies: dict[str, asyncio.Future] = {} 

143 self._agent_queues: dict[str, asyncio.Queue] = defaultdict(asyncio.Queue) 

144 

145 def subscribe(self, agent_id: str, topic: str, callback: Callable): 

146 """Subscribe an agent to a topic.""" 

147 key = f"{agent_id}:{topic}" 

148 self._subscribers[key].append(callback) 

149 

150 def unsubscribe(self, agent_id: str, topic: str = ""): 

151 """Unsubscribe from topics.""" 

152 if topic: 

153 self._subscribers.pop(f"{agent_id}:{topic}", None) 

154 else: 

155 # Remove all for this agent 

156 to_remove = [k for k in self._subscribers if k.startswith(f"{agent_id}:")] 

157 for k in to_remove: 

158 del self._subscribers[k] 

159 

160 async def publish(self, msg: SwarmMessage): 

161 """Publish a message to all relevant subscribers.""" 

162 msg.timestamp = msg.timestamp or time.time() 

163 self._message_log.append(msg) 

164 

165 # Deliver to target agent's queue 

166 if msg.to_agent: 

167 await self._agent_queues[msg.to_agent].put(msg) 

168 else: 

169 # Broadcast 

170 for agent_id in list(self._agent_queues.keys()): 

171 if agent_id != msg.from_agent: 

172 await self._agent_queues[agent_id].put(msg) 

173 

174 # Topic subscribers 

175 topic = msg.topic or "*" 

176 for key, callbacks in self._subscribers.items(): 

177 agent_prefix, sub_topic = key.split(":", 1) if ":" in key else (key, "*") 

178 if sub_topic == topic or sub_topic == "*": 

179 for cb in callbacks: 

180 try: 

181 if asyncio.iscoroutinefunction(cb): 

182 await cb(msg) 

183 else: 

184 cb(msg) 

185 except Exception: 

186 pass 

187 

188 async def receive(self, agent_id: str, timeout: float = 1.0) -> Optional[SwarmMessage]: 

189 """Receive next message for an agent.""" 

190 try: 

191 return await asyncio.wait_for( 

192 self._agent_queues[agent_id].get(), 

193 timeout=timeout, 

194 ) 

195 except asyncio.TimeoutError: 

196 return None 

197 

198 async def request(self, msg: SwarmMessage, timeout: float = 30.0) -> Any: 

199 """Send a request and wait for reply.""" 

200 self._pending_replies[msg.message_id] = asyncio.get_event_loop().create_future() 

201 await self.publish(msg) 

202 try: 

203 return await asyncio.wait_for( 

204 self._pending_replies[msg.message_id], 

205 timeout=timeout, 

206 ) 

207 except asyncio.TimeoutError: 

208 self._pending_replies.pop(msg.message_id, None) 

209 return None 

210 

211 def reply(self, original_msg: SwarmMessage, response: Any): 

212 """Reply to a pending request.""" 

213 if original_msg.message_id in self._pending_replies: 

214 fut = self._pending_replies.pop(original_msg.message_id) 

215 if not fut.done(): 

216 fut.set_result(response) 

217 

218 def get_message_log(self, limit: int = 100) -> list[SwarmMessage]: 

219 """Get recent messages for debugging.""" 

220 return self._message_log[-limit:] 

221 

222 

223# ── Dynamic Task Allocator ────────────────────────────────────────── 

224 

225class TaskAllocator: 

226 """Workload-aware dynamic task allocation. 

227 

228 Considers: 

229 - Agent capabilities vs task requirements 

230 - Current agent load (concurrency) 

231 - Task priority 

232 - Affinity (same agent for related tasks) 

233 """ 

234 

235 def __init__(self): 

236 self._assignments: dict[str, str] = {} # task_id → agent_id 

237 

238 def allocate( 

239 self, 

240 task: SwarmTask, 

241 agents: list[AgentInfo], 

242 ) -> Optional[str]: 

243 """Find the best agent for a task. 

244 

245 Args: 

246 task: The task to allocate 

247 agents: Available agents in the swarm 

248 

249 Returns: 

250 Agent ID if allocated, None if no suitable agent found. 

251 """ 

252 available = [a for a in agents if a.is_available] 

253 if not available: 

254 return None 

255 

256 scored: list[tuple[AgentInfo, float]] = [] 

257 

258 for agent in available: 

259 score = 0.0 

260 

261 # Capability match 

262 if task.required_capabilities: 

263 match = len(set(task.required_capabilities) & set(agent.capabilities)) 

264 total = len(task.required_capabilities) 

265 score += (match / total) * 50 if total > 0 else 25 

266 

267 # Workload penalty: prefer less loaded agents 

268 score -= agent.current_load * 10 

269 

270 # Role bonus: specialists for specialist tasks 

271 if agent.role == AgentRole.SPECIALIST: 

272 if any(cap in agent.capabilities for cap in task.required_capabilities): 

273 score += 20 

274 

275 # Affinity: prefer reusing agent for related tasks 

276 if task.parent_task_id and self._assignments.get(task.parent_task_id) == agent.agent_id: 

277 score += 15 

278 

279 scored.append((agent, score)) 

280 

281 scored.sort(key=lambda x: x[1], reverse=True) 

282 

283 if scored and scored[0][1] > 0: 

284 best = scored[0][0] 

285 self._assignments[task.task_id] = best.agent_id 

286 return best.agent_id 

287 

288 return available[0].agent_id if available else None 

289 

290 

291# ── Conflict Resolver ─────────────────────────────────────────────── 

292 

293class ConflictType(str, Enum): 

294 FACTUAL = "factual" # Disagreeing on facts 

295 METHODOLOGICAL = "methodological" # Disagreeing on approach 

296 OUTPUT = "output" # Conflicting outputs 

297 RESOURCE = "resource" # Competing for same resource 

298 

299 

300class ConflictResolver: 

301 """Detect and resolve conflicts between agents. 

302 

303 Resolution strategies: 

304 - Majority vote: most common answer wins 

305 - Weighted vote: expert agents have more weight 

306 - Ranked choice: preference ordering 

307 - Escalation: escalate to coordinator 

308 - Consensus building: iterate until agreement 

309 """ 

310 

311 def __init__(self): 

312 self._conflict_log: list[dict] = [] 

313 

314 def detect_conflict( 

315 self, 

316 agent_outputs: dict[str, Any], 

317 expected_type: str = "text", 

318 ) -> list[dict]: 

319 """Detect conflicts between agent outputs. 

320 

321 Args: 

322 agent_outputs: {agent_id: output} 

323 expected_type: Type of output for conflict detection 

324 

325 Returns: 

326 List of detected conflicts. 

327 """ 

328 conflicts = [] 

329 agents = list(agent_outputs.keys()) 

330 

331 if len(agents) < 2: 

332 return conflicts 

333 

334 outputs = list(agent_outputs.values()) 

335 

336 # Text conflict: check if outputs significantly differ 

337 if all(isinstance(o, str) for o in outputs): 

338 for i in range(len(outputs)): 

339 for j in range(i + 1, len(outputs)): 

340 similarity = self._text_similarity(outputs[i], outputs[j]) 

341 if similarity < 0.3: # Significant disagreement 

342 conflicts.append({ 

343 "type": ConflictType.OUTPUT.value, 

344 "agents": [agents[i], agents[j]], 

345 "similarity": similarity, 

346 "outputs": {agents[i]: outputs[i][:200], agents[j]: outputs[j][:200]}, 

347 }) 

348 

349 # Numeric conflict: check if values differ by >threshold 

350 elif all(isinstance(o, (int, float)) for o in outputs): 

351 values = outputs 

352 mean_val = sum(values) / len(values) 

353 for i, val in enumerate(values): 

354 if abs(val - mean_val) / max(abs(mean_val), 1) > 0.5: 

355 conflicts.append({ 

356 "type": ConflictType.FACTUAL.value, 

357 "agents": [agents[i]], 

358 "value": val, 

359 "mean": mean_val, 

360 "deviation": abs(val - mean_val) / max(abs(mean_val), 1), 

361 }) 

362 

363 return conflicts 

364 

365 def resolve( 

366 self, 

367 agent_outputs: dict[str, Any], 

368 weights: dict[str, float] | None = None, 

369 strategy: str = "majority", 

370 expected_type: str = "text", 

371 ) -> dict[str, Any]: 

372 """Resolve conflicts and produce a final answer. 

373 

374 Args: 

375 agent_outputs: {agent_id: output} 

376 weights: Optional agent weight mapping 

377 strategy: Resolution strategy 

378 expected_type: "text" or "numeric" 

379 

380 Returns: 

381 Dict with resolved output and resolution metadata. 

382 """ 

383 if len(agent_outputs) == 1: 

384 agent_id = list(agent_outputs.keys())[0] 

385 return {"output": agent_outputs[agent_id], "method": "single_agent", "conflict": False} 

386 

387 outputs = list(agent_outputs.values()) 

388 agents = list(agent_outputs.keys()) 

389 

390 if all(isinstance(o, str) for o in outputs): 

391 return self._resolve_text(agent_outputs, weights, strategy) 

392 elif all(isinstance(o, (int, float)) for o in outputs): 

393 return self._resolve_numeric(agent_outputs, weights, strategy) 

394 else: 

395 return {"output": outputs[0], "method": "first", "conflict": True} 

396 

397 def _resolve_text(self, outputs: dict[str, str], weights: dict[str, float] | None, strategy: str) -> dict: 

398 """Resolve text conflicts.""" 

399 if strategy == "majority": 

400 # Group by similarity 

401 votes: dict[str, list[str]] = defaultdict(list) 

402 agent_ids = list(outputs.keys()) 

403 for i, a1 in enumerate(agent_ids): 

404 best_match = a1 

405 best_sim = 0 

406 for j, a2 in enumerate(agent_ids): 

407 if i == j: 

408 continue 

409 sim = self._text_similarity(outputs[a1], outputs[a2]) 

410 if sim > best_sim: 

411 best_sim = sim 

412 best_match = a2 

413 key = outputs[best_match][:50] 

414 votes[key].append(a1) 

415 

416 # Longest group wins 

417 winning_key = max(votes, key=lambda k: len(votes[k])) 

418 winning_agent = votes[winning_key][0] 

419 return { 

420 "output": outputs[winning_agent], 

421 "method": "majority", 

422 "votes": {k: len(v) for k, v in votes.items()}, 

423 "conflict": len(votes) > 1, 

424 } 

425 

426 elif strategy == "weighted": 

427 if not weights: 

428 return self._resolve_text(outputs, weights, "majority") 

429 # Weight by agent weights, pick highest weighted output 

430 best_agent = max(weights, key=weights.get) 

431 return {"output": outputs.get(best_agent, list(outputs.values())[0]), "method": "weighted", "conflict": False} 

432 

433 else: 

434 return {"output": list(outputs.values())[0], "method": "first", "conflict": False} 

435 

436 def _resolve_numeric(self, outputs: dict[str, float], weights: dict[str, float] | None, strategy: str) -> dict: 

437 """Resolve numeric conflicts.""" 

438 values = list(outputs.values()) 

439 agents = list(outputs.keys()) 

440 

441 if strategy == "weighted" and weights: 

442 total_weight = sum(weights.get(a, 1.0) for a in agents) 

443 weighted = sum(weights.get(a, 1.0) * outputs[a] for a in agents) / total_weight 

444 return {"output": weighted, "method": "weighted_average", "conflict": False} 

445 else: 

446 avg = sum(values) / len(values) 

447 return {"output": avg, "method": "average", "conflict": False} 

448 

449 def _text_similarity(self, a: str, b: str) -> float: 

450 """Simple text similarity.""" 

451 if a == b: 

452 return 1.0 

453 tokens_a = set(a.lower().split()) 

454 tokens_b = set(b.lower().split()) 

455 if not tokens_a or not tokens_b: 

456 return 0.0 

457 intersection = tokens_a & tokens_b 

458 union = tokens_a | tokens_b 

459 return len(intersection) / len(union) if union else 0.0 

460 

461 

462# ── Swarm Coordinator ─────────────────────────────────────────────── 

463 

464class SwarmCoordinator: 

465 """Orchestrates a swarm of agents. 

466 

467 Usage: 

468 coordinator = SwarmCoordinator(topology=SwarmTopology.STAR) 

469 coordinator.register_agent(AgentInfo("agent_1", AgentRole.WORKER, capabilities=["code"])) 

470 coordinator.register_agent(AgentInfo("agent_2", AgentRole.WORKER, capabilities=["data"])) 

471 coordinator.register_agent(AgentInfo("agent_3", AgentRole.REVIEWER)) 

472 

473 task = SwarmTask("task_1", "Analyze data and generate report", 

474 required_capabilities=["data", "code"]) 

475 result = await coordinator.execute(task) 

476 """ 

477 

478 def __init__( 

479 self, 

480 topology: SwarmTopology = SwarmTopology.STAR, 

481 max_parallel_tasks: int = 10, 

482 heartbeat_interval: float = 5.0, 

483 heartbeat_timeout: float = 30.0, 

484 ): 

485 self.topology = topology 

486 self.max_parallel_tasks = max_parallel_tasks 

487 

488 self._agents: dict[str, AgentInfo] = {} 

489 self._tasks: dict[str, SwarmTask] = {} 

490 self._bus = MessageBus() 

491 self._allocator = TaskAllocator() 

492 self._resolver = ConflictResolver() 

493 self._task_executor: Optional[Callable] = None 

494 

495 # Health 

496 self._heartbeat_interval = heartbeat_interval 

497 self._heartbeat_timeout = heartbeat_timeout 

498 self._health_task: Optional[asyncio.Task] = None 

499 

500 # State 

501 self._run_history: list[dict] = [] 

502 self._started = False 

503 

504 # ── Agent Management ── 

505 

506 def register_agent(self, agent: AgentInfo): 

507 """Register an agent with the swarm.""" 

508 agent.last_heartbeat = time.time() 

509 self._agents[agent.agent_id] = agent 

510 

511 def unregister_agent(self, agent_id: str): 

512 """Remove an agent from the swarm.""" 

513 self._agents.pop(agent_id, None) 

514 self._bus.unsubscribe(agent_id) 

515 

516 def set_task_executor(self, executor: Callable[[str, SwarmTask], Awaitable[Any]]): 

517 """Set the function that executes tasks on agents. 

518 

519 Args: 

520 executor: async function(agent_id, task) -> result 

521 """ 

522 self._task_executor = executor 

523 

524 # ── Task Execution ── 

525 

526 async def execute(self, task: SwarmTask) -> dict[str, Any]: 

527 """Execute a task across the swarm. 

528 

529 If the task has required_capabilities that span multiple agents, 

530 it will be decomposed and executed collaboratively. 

531 """ 

532 if not self._started: 

533 await self.start() 

534 

535 task.task_id = task.task_id or str(uuid.uuid4())[:8] 

536 self._tasks[task.task_id] = task 

537 

538 # Allocate 

539 agent_id = self._allocator.allocate(task, list(self._agents.values())) 

540 if not agent_id: 

541 return {"status": "failed", "error": "No suitable agent available"} 

542 

543 # Assign and execute 

544 task.assigned_to = agent_id 

545 task.status = TaskStatus.RUNNING 

546 task.started_at = time.time() 

547 

548 agent = self._agents[agent_id] 

549 agent.current_load += 1 

550 

551 try: 

552 if self._task_executor: 

553 result = await self._task_executor(agent_id, task) 

554 else: 

555 result = f"[Simulated] Agent '{agent_id}' completed: '{task.description}'" 

556 

557 task.status = TaskStatus.COMPLETED 

558 task.result = result 

559 except Exception as e: 

560 task.status = TaskStatus.FAILED 

561 task.error = str(e) 

562 if task.retry_count < task.max_retries: 

563 task.retry_count += 1 

564 task.status = TaskStatus.PENDING 

565 return await self.execute(task) 

566 

567 finally: 

568 agent.current_load -= 1 

569 task.completed_at = time.time() 

570 

571 self._run_history.append({ 

572 "task_id": task.task_id, 

573 "agent_id": agent_id, 

574 "status": task.status.value, 

575 "duration_ms": task.duration_ms, 

576 }) 

577 

578 return { 

579 "task_id": task.task_id, 

580 "status": task.status.value, 

581 "agent_id": agent_id, 

582 "result": task.result, 

583 "duration_ms": task.duration_ms, 

584 "error": task.error, 

585 } 

586 

587 async def execute_parallel(self, tasks: list[SwarmTask]) -> list[dict[str, Any]]: 

588 """Execute multiple tasks in parallel across the swarm. 

589 

590 Uses DAG dependency resolution: tasks with unmet dependencies 

591 are deferred until their prerequisites complete. 

592 """ 

593 # Build dependency graph 

594 ready: list[SwarmTask] = [] 

595 waiting: list[SwarmTask] = [] 

596 for task in tasks: 

597 if not task.dependencies: 

598 ready.append(task) 

599 else: 

600 waiting.append(task) 

601 

602 results = [] 

603 sem = asyncio.Semaphore(self.max_parallel_tasks) 

604 

605 async def run_one(task: SwarmTask): 

606 async with sem: 

607 return await self.execute(task) 

608 

609 while ready: 

610 batch = ready[:self.max_parallel_tasks] 

611 ready = ready[self.max_parallel_tasks:] 

612 

613 batch_results = await asyncio.gather(*[run_one(t) for t in batch]) 

614 

615 for result in batch_results: 

616 results.append(result) 

617 

618 # Unblock dependent tasks 

619 completed_ids = {r["task_id"] for r in batch_results if r["status"] == "completed"} 

620 still_waiting = [] 

621 for task in waiting: 

622 task.dependencies = [d for d in task.dependencies if d not in completed_ids] 

623 if not task.dependencies: 

624 ready.append(task) 

625 else: 

626 still_waiting.append(task) 

627 waiting = still_waiting 

628 

629 return results 

630 

631 async def execute_with_review( 

632 self, 

633 task: SwarmTask, 

634 reviewer_count: int = 2, 

635 ) -> dict[str, Any]: 

636 """Execute a task with multiple agents and review results. 

637 

638 Multiple workers execute the same task independently. 

639 Results are compared, conflicts resolved, and final output returned. 

640 """ 

641 workers = [ 

642 a for a in self._agents.values() 

643 if a.role in (AgentRole.WORKER, AgentRole.SPECIALIST) and a.is_available 

644 ] 

645 reviewers = [ 

646 a for a in self._agents.values() 

647 if a.role == AgentRole.REVIEWER and a.is_available 

648 ] 

649 

650 if not workers: 

651 return {"status": "failed", "error": "No workers available"} 

652 

653 # Parallel execution 

654 worker_tasks = [] 

655 for i, worker in enumerate(workers[:5]): 

656 wt = SwarmTask( 

657 task_id=f"{task.task_id}_w{i}", 

658 description=task.description, 

659 required_capabilities=task.required_capabilities, 

660 ) 

661 worker_tasks.append(wt) 

662 

663 parallel_results = await self.execute_parallel(worker_tasks) 

664 

665 # Collect outputs 

666 agent_outputs = {} 

667 agent_weights = {} 

668 for result in parallel_results: 

669 if result["status"] == "completed": 

670 agent_id = result["agent_id"] 

671 agent_outputs[agent_id] = result.get("result", "") 

672 agent_weights[agent_id] = 1.0 

673 

674 # Resolve conflicts 

675 resolution = self._resolver.resolve(agent_outputs, agent_weights) 

676 

677 # Reviewers audit 

678 reviewer_notes = [] 

679 for reviewer in reviewers[:reviewer_count]: 

680 review_msg = SwarmMessage( 

681 message_id=str(uuid.uuid4()), 

682 from_agent="coordinator", 

683 to_agent=reviewer.agent_id, 

684 topic="review", 

685 payload={ 

686 "task": task.description, 

687 "resolution": resolution, 

688 "worker_outputs": agent_outputs, 

689 }, 

690 ) 

691 review_result = await self._bus.request(review_msg, timeout=15.0) 

692 if review_result: 

693 reviewer_notes.append(review_result) 

694 

695 return { 

696 "status": "completed", 

697 "output": resolution["output"], 

698 "resolution_method": resolution.get("method", "unknown"), 

699 "conflict_detected": resolution.get("conflict", False), 

700 "worker_count": len(parallel_results), 

701 "reviewer_notes": reviewer_notes, 

702 } 

703 

704 # ── Swarm Communication ── 

705 

706 async def broadcast(self, from_agent: str, topic: str, payload: Any): 

707 """Broadcast a message to all agents.""" 

708 msg = SwarmMessage( 

709 message_id=str(uuid.uuid4()), 

710 from_agent=from_agent, 

711 topic=topic, 

712 payload=payload, 

713 ) 

714 await self._bus.publish(msg) 

715 

716 async def whisper(self, from_agent: str, to_agent: str, payload: Any): 

717 """Send a direct message to a specific agent.""" 

718 msg = SwarmMessage( 

719 message_id=str(uuid.uuid4()), 

720 from_agent=from_agent, 

721 to_agent=to_agent, 

722 payload=payload, 

723 ) 

724 await self._bus.publish(msg) 

725 

726 # ── Health Monitoring ── 

727 

728 async def start(self): 

729 """Start the swarm coordinator.""" 

730 self._started = True 

731 self._health_task = asyncio.create_task(self._health_monitor_loop()) 

732 

733 async def stop(self): 

734 """Stop the swarm coordinator.""" 

735 self._started = False 

736 if self._health_task: 

737 self._health_task.cancel() 

738 try: 

739 await self._health_task 

740 except asyncio.CancelledError: 

741 pass 

742 

743 async def _health_monitor_loop(self): 

744 """Background health monitoring.""" 

745 while self._started: 

746 now = time.time() 

747 dead_agents = [] 

748 

749 for agent in self._agents.values(): 

750 if now - agent.last_heartbeat > self._heartbeat_timeout: 

751 agent.is_alive = False 

752 dead_agents.append(agent.agent_id) 

753 

754 for agent_id in dead_agents: 

755 await self._handle_dead_agent(agent_id) 

756 

757 await asyncio.sleep(self._heartbeat_interval) 

758 

759 async def _handle_dead_agent(self, agent_id: str): 

760 """Handle a dead agent: reassign its tasks.""" 

761 agent = self._agents.get(agent_id) 

762 if not agent: 

763 return 

764 

765 # Reassign pending tasks 

766 for task in self._tasks.values(): 

767 if task.assigned_to == agent_id and task.status == TaskStatus.RUNNING: 

768 task.status = TaskStatus.PENDING 

769 task.assigned_to = "" 

770 task.retry_count += 1 

771 

772 self._run_history.append({ 

773 "event": "agent_dead", 

774 "agent_id": agent_id, 

775 "timestamp": time.time(), 

776 }) 

777 

778 # ── State & Reporting ── 

779 

780 def snapshot(self) -> dict[str, Any]: 

781 """Create a snapshot of swarm state (for checkpoint/resume).""" 

782 return { 

783 "timestamp": time.time(), 

784 "topology": self.topology.value, 

785 "agents": { 

786 agent_id: { 

787 "role": a.role.value, 

788 "capabilities": a.capabilities, 

789 "current_load": a.current_load, 

790 "is_alive": a.is_alive, 

791 } 

792 for agent_id, a in self._agents.items() 

793 }, 

794 "tasks": { 

795 tid: { 

796 "description": t.description, 

797 "status": t.status.value, 

798 "assigned_to": t.assigned_to, 

799 "priority": t.priority.value, 

800 } 

801 for tid, t in self._tasks.items() 

802 }, 

803 } 

804 

805 def get_stats(self) -> dict[str, Any]: 

806 """Get swarm statistics.""" 

807 agents = list(self._agents.values()) 

808 alive = [a for a in agents if a.is_alive] 

809 tasks = list(self._tasks.values()) 

810 completed = [t for t in tasks if t.status == TaskStatus.COMPLETED] 

811 

812 return { 

813 "agent_count": len(agents), 

814 "alive_agents": len(alive), 

815 "dead_agents": len(agents) - len(alive), 

816 "by_role": { 

817 role.value: sum(1 for a in agents if a.role == role) 

818 for role in AgentRole 

819 }, 

820 "total_tasks": len(tasks), 

821 "completed_tasks": len(completed), 

822 "failed_tasks": sum(1 for t in tasks if t.status == TaskStatus.FAILED), 

823 "pending_tasks": sum(1 for t in tasks if t.status == TaskStatus.PENDING), 

824 "avg_task_duration_ms": ( 

825 sum(t.duration_ms for t in completed) / len(completed) 

826 if completed else 0 

827 ), 

828 "total_messages": len(self._bus.get_message_log()), 

829 "total_runs": len(self._run_history), 

830 } 

831 

832 def get_agent(self, agent_id: str) -> Optional[AgentInfo]: 

833 return self._agents.get(agent_id) 

834 

835 def get_task(self, task_id: str) -> Optional[SwarmTask]: 

836 return self._tasks.get(task_id)