Coverage for agentos/swarm/patterns.py: 34%
157 statements
« prev ^ index » next coverage.py v7.14.3, created at 2026-07-02 09:59 +0800
« prev ^ index » next coverage.py v7.14.3, created at 2026-07-02 09:59 +0800
1"""
2Enhanced Swarm collaboration patterns.
4Extends the base SwarmCoordinator with broadcast, pipeline, hierarchical,
5and consensus-based collaboration topologies.
6"""
8from __future__ import annotations
10import asyncio
11import time
12from dataclasses import dataclass, field
13from enum import Enum
14from typing import Any, Callable, Optional
16from agentos.swarm.coordinator import SwarmCoordinator
19class Topology(Enum):
20 """Swarm collaboration topology."""
22 BROADCAST = "broadcast"
23 """One-to-all: leader broadcasts task, all members respond independently."""
25 PIPELINE = "pipeline"
26 """Sequential chain: each member processes output of previous member."""
28 HIERARCHICAL = "hierarchical"
29 """Tree structure: leader delegates to sub-leaders who manage sub-teams."""
31 CONSENSUS = "consensus"
32 """Voting: all members vote on outputs, majority wins."""
34 ROUND_ROBIN = "round_robin"
35 """Load-balancing: tasks distributed evenly across members."""
38@dataclass
39class CollaborationConfig:
40 """Configuration for swarm collaboration."""
42 topology: Topology = Topology.BROADCAST
43 timeout_per_member: float = 60.0
44 """Max seconds per member invocation."""
46 max_parallel: int = 5
47 """Max concurrent member executions (broadcast/consensus)."""
49 quorum_ratio: float = 0.5
50 """Minimum ratio of members needed for consensus (consensus topology)."""
52 allow_partial_results: bool = True
53 """Return partial results if some members fail."""
56@dataclass
57class MemberResult:
58 """Result from a single swarm member."""
60 member_id: str
61 success: bool
62 output: Any = None
63 error: Optional[str] = None
64 latency_ms: float = 0.0
67@dataclass
68class CollaborationResult:
69 """Aggregated result from a swarm collaboration."""
71 topology: Topology
72 member_results: list[MemberResult]
73 aggregated_output: Any = None
74 total_latency_ms: float = 0.0
75 success_count: int = 0
76 failure_count: int = 0
79class SwarmPatterns:
80 """
81 Higher-order swarm collaboration patterns built on SwarmCoordinator.
83 Supports five topologies: broadcast, pipeline, hierarchical, consensus, round_robin.
84 """
86 def __init__(
87 self,
88 coordinator: SwarmCoordinator,
89 config: Optional[CollaborationConfig] = None,
90 ):
91 self._coordinator = coordinator
92 self._config = config or CollaborationConfig()
94 # ---- Synchronous API ----
96 def collaborate(
97 self,
98 task: str,
99 context: Optional[dict[str, Any]] = None,
100 ) -> CollaborationResult:
101 """
102 Execute collaboration using configured topology.
104 Args:
105 task: The task description to collaborate on.
106 context: Optional context dict passed to all members.
108 Returns:
109 CollaborationResult with individual and aggregated outputs.
110 """
111 t0 = time.perf_counter()
112 topology = self._config.topology
114 dispatchers: dict[Topology, Callable] = {
115 Topology.BROADCAST: self._broadcast,
116 Topology.PIPELINE: self._pipeline,
117 Topology.HIERARCHICAL: self._hierarchical,
118 Topology.CONSENSUS: self._consensus,
119 Topology.ROUND_ROBIN: self._round_robin,
120 }
122 handler = dispatchers.get(topology)
123 if handler is None:
124 raise ValueError(f"Unknown topology: {topology}")
126 result = handler(task, context)
127 result.total_latency_ms = (time.perf_counter() - t0) * 1000
128 result.topology = topology
129 return result
131 def _broadcast(
132 self, task: str, context: Optional[dict] = None
133 ) -> CollaborationResult:
134 """Broadcast task to all members, collect all responses."""
135 members = self._coordinator.list_members()
136 results: list[MemberResult] = []
138 for member in members:
139 m_result = self._invoke_member(member, task, context)
140 results.append(m_result)
142 aggregated = [r.output for r in results if r.success]
143 success = sum(1 for r in results if r.success)
144 failure = len(results) - success
146 return CollaborationResult(
147 topology=Topology.BROADCAST,
148 member_results=results,
149 aggregated_output=aggregated,
150 success_count=success,
151 failure_count=failure,
152 )
154 def _pipeline(
155 self, task: str, context: Optional[dict] = None
156 ) -> CollaborationResult:
157 """Sequential pipeline: each member processes previous output."""
158 members = self._coordinator.list_members()
159 results: list[MemberResult] = []
160 current_input = task
162 for member in members:
163 m_result = self._invoke_member(
164 member, current_input, context
165 )
166 results.append(m_result)
167 if m_result.success:
168 current_input = str(m_result.output) if m_result.output else current_input
169 elif not self._config.allow_partial_results:
170 break
172 success = sum(1 for r in results if r.success)
173 failure = len(results) - success
175 return CollaborationResult(
176 topology=Topology.PIPELINE,
177 member_results=results,
178 aggregated_output=current_input,
179 success_count=success,
180 failure_count=failure,
181 )
183 def _hierarchical(
184 self, task: str, context: Optional[dict] = None
185 ) -> CollaborationResult:
186 """Two-level hierarchy: leader delegates to sub-groups."""
187 members = self._coordinator.list_members()
188 n = len(members)
189 if n < 2:
190 # Fallback to broadcast for small swarms
191 return self._broadcast(task, context)
193 # Split members: first half as sub-leaders, rest as workers
194 split = max(1, n // 2)
195 sub_leaders = members[:split]
196 workers = members[split:]
198 results: list[MemberResult] = []
199 # Step 1: Sub-leaders plan task decomposition
200 plan_task = f"Decompose this task into sub-tasks for {len(workers)} workers: {task}"
201 for leader in sub_leaders:
202 m_result = self._invoke_member(leader, plan_task, context)
203 results.append(m_result)
205 # Step 2: Workers execute sub-tasks
206 sub_tasks = task.split(";") if ";" in task else [task]
207 for i, worker in enumerate(workers):
208 sub_task = sub_tasks[i % len(sub_tasks)]
209 m_result = self._invoke_member(worker, sub_task.strip(), context)
210 results.append(m_result)
212 success = sum(1 for r in results if r.success)
213 failure = len(results) - success
215 return CollaborationResult(
216 topology=Topology.HIERARCHICAL,
217 member_results=results,
218 aggregated_output=[r.output for r in results if r.success],
219 success_count=success,
220 failure_count=failure,
221 )
223 def _consensus(
224 self, task: str, context: Optional[dict] = None
225 ) -> CollaborationResult:
226 """Voting: all members vote, majority output wins."""
227 members = self._coordinator.list_members()
228 results: list[MemberResult] = []
229 votes: dict[str, int] = {}
231 for member in members:
232 m_result = self._invoke_member(member, task, context)
233 results.append(m_result)
234 if m_result.success and m_result.output is not None:
235 key = str(m_result.output)
236 votes[key] = votes.get(key, 0) + 1
238 quorum = max(1, int(len(members) * self._config.quorum_ratio))
239 winner = None
240 for output_key, count in votes.items():
241 if count >= quorum:
242 winner = output_key
243 break
245 success = sum(1 for r in results if r.success)
246 failure = len(results) - success
248 return CollaborationResult(
249 topology=Topology.CONSENSUS,
250 member_results=results,
251 aggregated_output=winner or "No consensus reached",
252 success_count=success,
253 failure_count=failure,
254 )
256 def _round_robin(
257 self, task: str, context: Optional[dict] = None
258 ) -> CollaborationResult:
259 """Load-balancing: pick next available member."""
260 members = self._coordinator.list_members()
261 if not members:
262 return CollaborationResult(
263 topology=Topology.ROUND_ROBIN,
264 member_results=[],
265 aggregated_output=None,
266 success_count=0,
267 failure_count=0,
268 )
269 # Simple: use first available member (full RR needs state)
270 member = members[0]
271 m_result = self._invoke_member(member, task, context)
272 success = 1 if m_result.success else 0
273 failure = 0 if m_result.success else 1
275 return CollaborationResult(
276 topology=Topology.ROUND_ROBIN,
277 member_results=[m_result],
278 aggregated_output=m_result.output,
279 success_count=success,
280 failure_count=failure,
281 )
283 def _invoke_member(
284 self, member_id: str, task: str, context: Optional[dict] = None
285 ) -> MemberResult:
286 """Invoke a single swarm member with timeout."""
287 t0 = time.perf_counter()
288 try:
289 output = self._coordinator.delegate(
290 member_id=member_id,
291 task=task,
292 context=context or {},
293 )
294 latency = (time.perf_counter() - t0) * 1000
295 return MemberResult(
296 member_id=member_id,
297 success=True,
298 output=output,
299 latency_ms=latency,
300 )
301 except Exception as exc:
302 latency = (time.perf_counter() - t0) * 1000
303 return MemberResult(
304 member_id=member_id,
305 success=False,
306 error=f"{type(exc).__name__}: {exc}",
307 latency_ms=latency,
308 )
310 # ---- Async API ----
312 async def collaborate_async(
313 self,
314 task: str,
315 context: Optional[dict[str, Any]] = None,
316 ) -> CollaborationResult:
317 """Async version using asyncio for broadcast/consensus topologies."""
318 t0 = time.perf_counter()
319 members = self._coordinator.list_members()
321 if self._config.topology == Topology.BROADCAST:
322 semaphore = asyncio.Semaphore(self._config.max_parallel)
324 async def run_one(member_id: str) -> MemberResult:
325 async with semaphore:
326 return await asyncio.to_thread(
327 self._invoke_member, member_id, task, context
328 )
330 results = await asyncio.gather(
331 *[run_one(m) for m in members]
332 )
333 member_results = list(results)
334 else:
335 # Other topologies: run sequentially in thread
336 member_results = await asyncio.to_thread(
337 self.collaborate, task, context
338 )
339 if isinstance(member_results, CollaborationResult):
340 member_results = member_results.member_results
342 success = sum(1 for r in member_results if r.success)
343 failure = len(member_results) - success
345 return CollaborationResult(
346 topology=self._config.topology,
347 member_results=member_results,
348 aggregated_output=[r.output for r in member_results if r.success],
349 total_latency_ms=(time.perf_counter() - t0) * 1000,
350 success_count=success,
351 failure_count=failure,
352 )