Coverage for agentos/orchestration/swarm_coordinator.py: 30%
402 statements
« prev ^ index » next coverage.py v7.14.3, created at 2026-07-02 09:59 +0800
« prev ^ index » next coverage.py v7.14.3, created at 2026-07-02 09:59 +0800
1"""
2Swarm Coordinator v2 (v1.9.0)
4Production-grade multi-agent swarm orchestration. Turns individual agents
5into a coordinated team with dynamic task allocation, inter-agent messaging,
6and conflict resolution.
8New in v2:
9 - Dynamic task allocation: workload-aware agent assignment
10 - Inter-agent message bus: pub/sub, broadcasts, directed messages
11 - Conflict resolver: detect and resolve agent disagreements
12 - Consensus protocols: majority vote, ranked choice, weighted voting
13 - Topology manager: star, mesh, ring, tree, DAG topologies
14 - Health monitor: heartbeat, dead agent detection, auto-recovery
15 - Swarm state snapshot: checkpoint/resume for entire swarms
16"""
18from __future__ import annotations
20import asyncio
21import json
22import time
23import uuid
24from collections import defaultdict
25from dataclasses import dataclass, field
26from enum import Enum
27from pathlib import Path
28from typing import Any, Optional, Callable, Awaitable
31# ── Types ───────────────────────────────────────────────────────────
33class AgentRole(str, Enum):
34 COORDINATOR = "coordinator" # Orchestrates the swarm
35 WORKER = "worker" # Executes tasks
36 REVIEWER = "reviewer" # Reviews outputs
37 OBSERVER = "observer" # Monitors only
38 SPECIALIST = "specialist" # Domain expert (code, data, etc.)
41class TaskPriority(str, Enum):
42 CRITICAL = "critical" # Must complete, blocks everything
43 HIGH = "high"
44 MEDIUM = "medium"
45 LOW = "low"
48class TaskStatus(str, Enum):
49 PENDING = "pending"
50 ASSIGNED = "assigned"
51 RUNNING = "running"
52 COMPLETED = "completed"
53 FAILED = "failed"
54 CANCELED = "canceled"
57class SwarmTopology(str, Enum):
58 STAR = "star" # One coordinator, all others are workers
59 MESH = "mesh" # Every agent can talk to every agent
60 RING = "ring" # Circular communication
61 TREE = "tree" # Hierarchical
62 DAG = "dag" # Workflow-based dependencies
63 HYBRID = "hybrid" # Dynamic topology switching
66@dataclass
67class AgentInfo:
68 """Metadata about a swarm agent."""
69 agent_id: str
70 role: AgentRole
71 capabilities: list[str] = field(default_factory=list)
72 model: str = ""
73 max_concurrency: int = 3
74 current_load: int = 0
75 is_alive: bool = True
76 last_heartbeat: float = 0.0
77 metadata: dict[str, Any] = field(default_factory=dict)
79 @property
80 def is_available(self) -> bool:
81 return self.is_alive and self.current_load < self.max_concurrency
84@dataclass
85class SwarmTask:
86 """A task to be executed by the swarm."""
87 task_id: str
88 description: str
89 priority: TaskPriority = TaskPriority.MEDIUM
90 status: TaskStatus = TaskStatus.PENDING
91 assigned_to: str = "" # Agent ID
92 required_capabilities: list[str] = field(default_factory=list)
93 parent_task_id: str = "" # DAG dependency
94 dependencies: list[str] = field(default_factory=list) # Task IDs that must complete first
95 result: Any = None
96 error: str = ""
97 started_at: float = 0.0
98 completed_at: float = 0.0
99 retry_count: int = 0
100 max_retries: int = 3
101 metadata: dict[str, Any] = field(default_factory=dict)
103 @property
104 def is_ready(self) -> bool:
105 return self.status == TaskStatus.PENDING and not self.dependencies
107 @property
108 def duration_ms(self) -> float:
109 if self.completed_at and self.started_at:
110 return (self.completed_at - self.started_at) * 1000
111 return 0.0
114# ── Inter-Agent Message Bus ─────────────────────────────────────────
116@dataclass
117class SwarmMessage:
118 """A message sent between agents in the swarm."""
119 message_id: str
120 from_agent: str
121 to_agent: str = "" # Empty = broadcast
122 topic: str = ""
123 payload: Any = None
124 timestamp: float = 0.0
125 reply_to: str = ""
126 metadata: dict[str, Any] = field(default_factory=dict)
129class MessageBus:
130 """Inter-agent pub/sub message bus.
132 Supports:
133 - Point-to-point: send to specific agent
134 - Broadcast: send to all agents
135 - Topic-based: subscribe to channels
136 - Request-reply: async request with reply correlation
137 """
139 def __init__(self):
140 self._subscribers: dict[str, list[Callable]] = defaultdict(list)
141 self._message_log: list[SwarmMessage] = []
142 self._pending_replies: dict[str, asyncio.Future] = {}
143 self._agent_queues: dict[str, asyncio.Queue] = defaultdict(asyncio.Queue)
145 def subscribe(self, agent_id: str, topic: str, callback: Callable):
146 """Subscribe an agent to a topic."""
147 key = f"{agent_id}:{topic}"
148 self._subscribers[key].append(callback)
150 def unsubscribe(self, agent_id: str, topic: str = ""):
151 """Unsubscribe from topics."""
152 if topic:
153 self._subscribers.pop(f"{agent_id}:{topic}", None)
154 else:
155 # Remove all for this agent
156 to_remove = [k for k in self._subscribers if k.startswith(f"{agent_id}:")]
157 for k in to_remove:
158 del self._subscribers[k]
160 async def publish(self, msg: SwarmMessage):
161 """Publish a message to all relevant subscribers."""
162 msg.timestamp = msg.timestamp or time.time()
163 self._message_log.append(msg)
165 # Deliver to target agent's queue
166 if msg.to_agent:
167 await self._agent_queues[msg.to_agent].put(msg)
168 else:
169 # Broadcast
170 for agent_id in list(self._agent_queues.keys()):
171 if agent_id != msg.from_agent:
172 await self._agent_queues[agent_id].put(msg)
174 # Topic subscribers
175 topic = msg.topic or "*"
176 for key, callbacks in self._subscribers.items():
177 agent_prefix, sub_topic = key.split(":", 1) if ":" in key else (key, "*")
178 if sub_topic == topic or sub_topic == "*":
179 for cb in callbacks:
180 try:
181 if asyncio.iscoroutinefunction(cb):
182 await cb(msg)
183 else:
184 cb(msg)
185 except Exception:
186 pass
188 async def receive(self, agent_id: str, timeout: float = 1.0) -> Optional[SwarmMessage]:
189 """Receive next message for an agent."""
190 try:
191 return await asyncio.wait_for(
192 self._agent_queues[agent_id].get(),
193 timeout=timeout,
194 )
195 except asyncio.TimeoutError:
196 return None
198 async def request(self, msg: SwarmMessage, timeout: float = 30.0) -> Any:
199 """Send a request and wait for reply."""
200 self._pending_replies[msg.message_id] = asyncio.get_event_loop().create_future()
201 await self.publish(msg)
202 try:
203 return await asyncio.wait_for(
204 self._pending_replies[msg.message_id],
205 timeout=timeout,
206 )
207 except asyncio.TimeoutError:
208 self._pending_replies.pop(msg.message_id, None)
209 return None
211 def reply(self, original_msg: SwarmMessage, response: Any):
212 """Reply to a pending request."""
213 if original_msg.message_id in self._pending_replies:
214 fut = self._pending_replies.pop(original_msg.message_id)
215 if not fut.done():
216 fut.set_result(response)
218 def get_message_log(self, limit: int = 100) -> list[SwarmMessage]:
219 """Get recent messages for debugging."""
220 return self._message_log[-limit:]
223# ── Dynamic Task Allocator ──────────────────────────────────────────
225class TaskAllocator:
226 """Workload-aware dynamic task allocation.
228 Considers:
229 - Agent capabilities vs task requirements
230 - Current agent load (concurrency)
231 - Task priority
232 - Affinity (same agent for related tasks)
233 """
235 def __init__(self):
236 self._assignments: dict[str, str] = {} # task_id → agent_id
238 def allocate(
239 self,
240 task: SwarmTask,
241 agents: list[AgentInfo],
242 ) -> Optional[str]:
243 """Find the best agent for a task.
245 Args:
246 task: The task to allocate
247 agents: Available agents in the swarm
249 Returns:
250 Agent ID if allocated, None if no suitable agent found.
251 """
252 available = [a for a in agents if a.is_available]
253 if not available:
254 return None
256 scored: list[tuple[AgentInfo, float]] = []
258 for agent in available:
259 score = 0.0
261 # Capability match
262 if task.required_capabilities:
263 match = len(set(task.required_capabilities) & set(agent.capabilities))
264 total = len(task.required_capabilities)
265 score += (match / total) * 50 if total > 0 else 25
267 # Workload penalty: prefer less loaded agents
268 score -= agent.current_load * 10
270 # Role bonus: specialists for specialist tasks
271 if agent.role == AgentRole.SPECIALIST:
272 if any(cap in agent.capabilities for cap in task.required_capabilities):
273 score += 20
275 # Affinity: prefer reusing agent for related tasks
276 if task.parent_task_id and self._assignments.get(task.parent_task_id) == agent.agent_id:
277 score += 15
279 scored.append((agent, score))
281 scored.sort(key=lambda x: x[1], reverse=True)
283 if scored and scored[0][1] > 0:
284 best = scored[0][0]
285 self._assignments[task.task_id] = best.agent_id
286 return best.agent_id
288 return available[0].agent_id if available else None
291# ── Conflict Resolver ───────────────────────────────────────────────
293class ConflictType(str, Enum):
294 FACTUAL = "factual" # Disagreeing on facts
295 METHODOLOGICAL = "methodological" # Disagreeing on approach
296 OUTPUT = "output" # Conflicting outputs
297 RESOURCE = "resource" # Competing for same resource
300class ConflictResolver:
301 """Detect and resolve conflicts between agents.
303 Resolution strategies:
304 - Majority vote: most common answer wins
305 - Weighted vote: expert agents have more weight
306 - Ranked choice: preference ordering
307 - Escalation: escalate to coordinator
308 - Consensus building: iterate until agreement
309 """
311 def __init__(self):
312 self._conflict_log: list[dict] = []
314 def detect_conflict(
315 self,
316 agent_outputs: dict[str, Any],
317 expected_type: str = "text",
318 ) -> list[dict]:
319 """Detect conflicts between agent outputs.
321 Args:
322 agent_outputs: {agent_id: output}
323 expected_type: Type of output for conflict detection
325 Returns:
326 List of detected conflicts.
327 """
328 conflicts = []
329 agents = list(agent_outputs.keys())
331 if len(agents) < 2:
332 return conflicts
334 outputs = list(agent_outputs.values())
336 # Text conflict: check if outputs significantly differ
337 if all(isinstance(o, str) for o in outputs):
338 for i in range(len(outputs)):
339 for j in range(i + 1, len(outputs)):
340 similarity = self._text_similarity(outputs[i], outputs[j])
341 if similarity < 0.3: # Significant disagreement
342 conflicts.append({
343 "type": ConflictType.OUTPUT.value,
344 "agents": [agents[i], agents[j]],
345 "similarity": similarity,
346 "outputs": {agents[i]: outputs[i][:200], agents[j]: outputs[j][:200]},
347 })
349 # Numeric conflict: check if values differ by >threshold
350 elif all(isinstance(o, (int, float)) for o in outputs):
351 values = outputs
352 mean_val = sum(values) / len(values)
353 for i, val in enumerate(values):
354 if abs(val - mean_val) / max(abs(mean_val), 1) > 0.5:
355 conflicts.append({
356 "type": ConflictType.FACTUAL.value,
357 "agents": [agents[i]],
358 "value": val,
359 "mean": mean_val,
360 "deviation": abs(val - mean_val) / max(abs(mean_val), 1),
361 })
363 return conflicts
365 def resolve(
366 self,
367 agent_outputs: dict[str, Any],
368 weights: dict[str, float] | None = None,
369 strategy: str = "majority",
370 expected_type: str = "text",
371 ) -> dict[str, Any]:
372 """Resolve conflicts and produce a final answer.
374 Args:
375 agent_outputs: {agent_id: output}
376 weights: Optional agent weight mapping
377 strategy: Resolution strategy
378 expected_type: "text" or "numeric"
380 Returns:
381 Dict with resolved output and resolution metadata.
382 """
383 if len(agent_outputs) == 1:
384 agent_id = list(agent_outputs.keys())[0]
385 return {"output": agent_outputs[agent_id], "method": "single_agent", "conflict": False}
387 outputs = list(agent_outputs.values())
388 agents = list(agent_outputs.keys())
390 if all(isinstance(o, str) for o in outputs):
391 return self._resolve_text(agent_outputs, weights, strategy)
392 elif all(isinstance(o, (int, float)) for o in outputs):
393 return self._resolve_numeric(agent_outputs, weights, strategy)
394 else:
395 return {"output": outputs[0], "method": "first", "conflict": True}
397 def _resolve_text(self, outputs: dict[str, str], weights: dict[str, float] | None, strategy: str) -> dict:
398 """Resolve text conflicts."""
399 if strategy == "majority":
400 # Group by similarity
401 votes: dict[str, list[str]] = defaultdict(list)
402 agent_ids = list(outputs.keys())
403 for i, a1 in enumerate(agent_ids):
404 best_match = a1
405 best_sim = 0
406 for j, a2 in enumerate(agent_ids):
407 if i == j:
408 continue
409 sim = self._text_similarity(outputs[a1], outputs[a2])
410 if sim > best_sim:
411 best_sim = sim
412 best_match = a2
413 key = outputs[best_match][:50]
414 votes[key].append(a1)
416 # Longest group wins
417 winning_key = max(votes, key=lambda k: len(votes[k]))
418 winning_agent = votes[winning_key][0]
419 return {
420 "output": outputs[winning_agent],
421 "method": "majority",
422 "votes": {k: len(v) for k, v in votes.items()},
423 "conflict": len(votes) > 1,
424 }
426 elif strategy == "weighted":
427 if not weights:
428 return self._resolve_text(outputs, weights, "majority")
429 # Weight by agent weights, pick highest weighted output
430 best_agent = max(weights, key=weights.get)
431 return {"output": outputs.get(best_agent, list(outputs.values())[0]), "method": "weighted", "conflict": False}
433 else:
434 return {"output": list(outputs.values())[0], "method": "first", "conflict": False}
436 def _resolve_numeric(self, outputs: dict[str, float], weights: dict[str, float] | None, strategy: str) -> dict:
437 """Resolve numeric conflicts."""
438 values = list(outputs.values())
439 agents = list(outputs.keys())
441 if strategy == "weighted" and weights:
442 total_weight = sum(weights.get(a, 1.0) for a in agents)
443 weighted = sum(weights.get(a, 1.0) * outputs[a] for a in agents) / total_weight
444 return {"output": weighted, "method": "weighted_average", "conflict": False}
445 else:
446 avg = sum(values) / len(values)
447 return {"output": avg, "method": "average", "conflict": False}
449 def _text_similarity(self, a: str, b: str) -> float:
450 """Simple text similarity."""
451 if a == b:
452 return 1.0
453 tokens_a = set(a.lower().split())
454 tokens_b = set(b.lower().split())
455 if not tokens_a or not tokens_b:
456 return 0.0
457 intersection = tokens_a & tokens_b
458 union = tokens_a | tokens_b
459 return len(intersection) / len(union) if union else 0.0
462# ── Swarm Coordinator ───────────────────────────────────────────────
464class SwarmCoordinator:
465 """Orchestrates a swarm of agents.
467 Usage:
468 coordinator = SwarmCoordinator(topology=SwarmTopology.STAR)
469 coordinator.register_agent(AgentInfo("agent_1", AgentRole.WORKER, capabilities=["code"]))
470 coordinator.register_agent(AgentInfo("agent_2", AgentRole.WORKER, capabilities=["data"]))
471 coordinator.register_agent(AgentInfo("agent_3", AgentRole.REVIEWER))
473 task = SwarmTask("task_1", "Analyze data and generate report",
474 required_capabilities=["data", "code"])
475 result = await coordinator.execute(task)
476 """
478 def __init__(
479 self,
480 topology: SwarmTopology = SwarmTopology.STAR,
481 max_parallel_tasks: int = 10,
482 heartbeat_interval: float = 5.0,
483 heartbeat_timeout: float = 30.0,
484 ):
485 self.topology = topology
486 self.max_parallel_tasks = max_parallel_tasks
488 self._agents: dict[str, AgentInfo] = {}
489 self._tasks: dict[str, SwarmTask] = {}
490 self._bus = MessageBus()
491 self._allocator = TaskAllocator()
492 self._resolver = ConflictResolver()
493 self._task_executor: Optional[Callable] = None
495 # Health
496 self._heartbeat_interval = heartbeat_interval
497 self._heartbeat_timeout = heartbeat_timeout
498 self._health_task: Optional[asyncio.Task] = None
500 # State
501 self._run_history: list[dict] = []
502 self._started = False
504 # ── Agent Management ──
506 def register_agent(self, agent: AgentInfo):
507 """Register an agent with the swarm."""
508 agent.last_heartbeat = time.time()
509 self._agents[agent.agent_id] = agent
511 def unregister_agent(self, agent_id: str):
512 """Remove an agent from the swarm."""
513 self._agents.pop(agent_id, None)
514 self._bus.unsubscribe(agent_id)
516 def set_task_executor(self, executor: Callable[[str, SwarmTask], Awaitable[Any]]):
517 """Set the function that executes tasks on agents.
519 Args:
520 executor: async function(agent_id, task) -> result
521 """
522 self._task_executor = executor
524 # ── Task Execution ──
526 async def execute(self, task: SwarmTask) -> dict[str, Any]:
527 """Execute a task across the swarm.
529 If the task has required_capabilities that span multiple agents,
530 it will be decomposed and executed collaboratively.
531 """
532 if not self._started:
533 await self.start()
535 task.task_id = task.task_id or str(uuid.uuid4())[:8]
536 self._tasks[task.task_id] = task
538 # Allocate
539 agent_id = self._allocator.allocate(task, list(self._agents.values()))
540 if not agent_id:
541 return {"status": "failed", "error": "No suitable agent available"}
543 # Assign and execute
544 task.assigned_to = agent_id
545 task.status = TaskStatus.RUNNING
546 task.started_at = time.time()
548 agent = self._agents[agent_id]
549 agent.current_load += 1
551 try:
552 if self._task_executor:
553 result = await self._task_executor(agent_id, task)
554 else:
555 result = f"[Simulated] Agent '{agent_id}' completed: '{task.description}'"
557 task.status = TaskStatus.COMPLETED
558 task.result = result
559 except Exception as e:
560 task.status = TaskStatus.FAILED
561 task.error = str(e)
562 if task.retry_count < task.max_retries:
563 task.retry_count += 1
564 task.status = TaskStatus.PENDING
565 return await self.execute(task)
567 finally:
568 agent.current_load -= 1
569 task.completed_at = time.time()
571 self._run_history.append({
572 "task_id": task.task_id,
573 "agent_id": agent_id,
574 "status": task.status.value,
575 "duration_ms": task.duration_ms,
576 })
578 return {
579 "task_id": task.task_id,
580 "status": task.status.value,
581 "agent_id": agent_id,
582 "result": task.result,
583 "duration_ms": task.duration_ms,
584 "error": task.error,
585 }
587 async def execute_parallel(self, tasks: list[SwarmTask]) -> list[dict[str, Any]]:
588 """Execute multiple tasks in parallel across the swarm.
590 Uses DAG dependency resolution: tasks with unmet dependencies
591 are deferred until their prerequisites complete.
592 """
593 # Build dependency graph
594 ready: list[SwarmTask] = []
595 waiting: list[SwarmTask] = []
596 for task in tasks:
597 if not task.dependencies:
598 ready.append(task)
599 else:
600 waiting.append(task)
602 results = []
603 sem = asyncio.Semaphore(self.max_parallel_tasks)
605 async def run_one(task: SwarmTask):
606 async with sem:
607 return await self.execute(task)
609 while ready:
610 batch = ready[:self.max_parallel_tasks]
611 ready = ready[self.max_parallel_tasks:]
613 batch_results = await asyncio.gather(*[run_one(t) for t in batch])
615 for result in batch_results:
616 results.append(result)
618 # Unblock dependent tasks
619 completed_ids = {r["task_id"] for r in batch_results if r["status"] == "completed"}
620 still_waiting = []
621 for task in waiting:
622 task.dependencies = [d for d in task.dependencies if d not in completed_ids]
623 if not task.dependencies:
624 ready.append(task)
625 else:
626 still_waiting.append(task)
627 waiting = still_waiting
629 return results
631 async def execute_with_review(
632 self,
633 task: SwarmTask,
634 reviewer_count: int = 2,
635 ) -> dict[str, Any]:
636 """Execute a task with multiple agents and review results.
638 Multiple workers execute the same task independently.
639 Results are compared, conflicts resolved, and final output returned.
640 """
641 workers = [
642 a for a in self._agents.values()
643 if a.role in (AgentRole.WORKER, AgentRole.SPECIALIST) and a.is_available
644 ]
645 reviewers = [
646 a for a in self._agents.values()
647 if a.role == AgentRole.REVIEWER and a.is_available
648 ]
650 if not workers:
651 return {"status": "failed", "error": "No workers available"}
653 # Parallel execution
654 worker_tasks = []
655 for i, worker in enumerate(workers[:5]):
656 wt = SwarmTask(
657 task_id=f"{task.task_id}_w{i}",
658 description=task.description,
659 required_capabilities=task.required_capabilities,
660 )
661 worker_tasks.append(wt)
663 parallel_results = await self.execute_parallel(worker_tasks)
665 # Collect outputs
666 agent_outputs = {}
667 agent_weights = {}
668 for result in parallel_results:
669 if result["status"] == "completed":
670 agent_id = result["agent_id"]
671 agent_outputs[agent_id] = result.get("result", "")
672 agent_weights[agent_id] = 1.0
674 # Resolve conflicts
675 resolution = self._resolver.resolve(agent_outputs, agent_weights)
677 # Reviewers audit
678 reviewer_notes = []
679 for reviewer in reviewers[:reviewer_count]:
680 review_msg = SwarmMessage(
681 message_id=str(uuid.uuid4()),
682 from_agent="coordinator",
683 to_agent=reviewer.agent_id,
684 topic="review",
685 payload={
686 "task": task.description,
687 "resolution": resolution,
688 "worker_outputs": agent_outputs,
689 },
690 )
691 review_result = await self._bus.request(review_msg, timeout=15.0)
692 if review_result:
693 reviewer_notes.append(review_result)
695 return {
696 "status": "completed",
697 "output": resolution["output"],
698 "resolution_method": resolution.get("method", "unknown"),
699 "conflict_detected": resolution.get("conflict", False),
700 "worker_count": len(parallel_results),
701 "reviewer_notes": reviewer_notes,
702 }
704 # ── Swarm Communication ──
706 async def broadcast(self, from_agent: str, topic: str, payload: Any):
707 """Broadcast a message to all agents."""
708 msg = SwarmMessage(
709 message_id=str(uuid.uuid4()),
710 from_agent=from_agent,
711 topic=topic,
712 payload=payload,
713 )
714 await self._bus.publish(msg)
716 async def whisper(self, from_agent: str, to_agent: str, payload: Any):
717 """Send a direct message to a specific agent."""
718 msg = SwarmMessage(
719 message_id=str(uuid.uuid4()),
720 from_agent=from_agent,
721 to_agent=to_agent,
722 payload=payload,
723 )
724 await self._bus.publish(msg)
726 # ── Health Monitoring ──
728 async def start(self):
729 """Start the swarm coordinator."""
730 self._started = True
731 self._health_task = asyncio.create_task(self._health_monitor_loop())
733 async def stop(self):
734 """Stop the swarm coordinator."""
735 self._started = False
736 if self._health_task:
737 self._health_task.cancel()
738 try:
739 await self._health_task
740 except asyncio.CancelledError:
741 pass
743 async def _health_monitor_loop(self):
744 """Background health monitoring."""
745 while self._started:
746 now = time.time()
747 dead_agents = []
749 for agent in self._agents.values():
750 if now - agent.last_heartbeat > self._heartbeat_timeout:
751 agent.is_alive = False
752 dead_agents.append(agent.agent_id)
754 for agent_id in dead_agents:
755 await self._handle_dead_agent(agent_id)
757 await asyncio.sleep(self._heartbeat_interval)
759 async def _handle_dead_agent(self, agent_id: str):
760 """Handle a dead agent: reassign its tasks."""
761 agent = self._agents.get(agent_id)
762 if not agent:
763 return
765 # Reassign pending tasks
766 for task in self._tasks.values():
767 if task.assigned_to == agent_id and task.status == TaskStatus.RUNNING:
768 task.status = TaskStatus.PENDING
769 task.assigned_to = ""
770 task.retry_count += 1
772 self._run_history.append({
773 "event": "agent_dead",
774 "agent_id": agent_id,
775 "timestamp": time.time(),
776 })
778 # ── State & Reporting ──
780 def snapshot(self) -> dict[str, Any]:
781 """Create a snapshot of swarm state (for checkpoint/resume)."""
782 return {
783 "timestamp": time.time(),
784 "topology": self.topology.value,
785 "agents": {
786 agent_id: {
787 "role": a.role.value,
788 "capabilities": a.capabilities,
789 "current_load": a.current_load,
790 "is_alive": a.is_alive,
791 }
792 for agent_id, a in self._agents.items()
793 },
794 "tasks": {
795 tid: {
796 "description": t.description,
797 "status": t.status.value,
798 "assigned_to": t.assigned_to,
799 "priority": t.priority.value,
800 }
801 for tid, t in self._tasks.items()
802 },
803 }
805 def get_stats(self) -> dict[str, Any]:
806 """Get swarm statistics."""
807 agents = list(self._agents.values())
808 alive = [a for a in agents if a.is_alive]
809 tasks = list(self._tasks.values())
810 completed = [t for t in tasks if t.status == TaskStatus.COMPLETED]
812 return {
813 "agent_count": len(agents),
814 "alive_agents": len(alive),
815 "dead_agents": len(agents) - len(alive),
816 "by_role": {
817 role.value: sum(1 for a in agents if a.role == role)
818 for role in AgentRole
819 },
820 "total_tasks": len(tasks),
821 "completed_tasks": len(completed),
822 "failed_tasks": sum(1 for t in tasks if t.status == TaskStatus.FAILED),
823 "pending_tasks": sum(1 for t in tasks if t.status == TaskStatus.PENDING),
824 "avg_task_duration_ms": (
825 sum(t.duration_ms for t in completed) / len(completed)
826 if completed else 0
827 ),
828 "total_messages": len(self._bus.get_message_log()),
829 "total_runs": len(self._run_history),
830 }
832 def get_agent(self, agent_id: str) -> Optional[AgentInfo]:
833 return self._agents.get(agent_id)
835 def get_task(self, task_id: str) -> Optional[SwarmTask]:
836 return self._tasks.get(task_id)