Coverage for agentos/swarm/patterns.py: 34%

157 statements  

« prev     ^ index     » next       coverage.py v7.14.3, created at 2026-07-02 09:59 +0800

1""" 

2Enhanced Swarm collaboration patterns. 

3 

4Extends the base SwarmCoordinator with broadcast, pipeline, hierarchical, 

5and consensus-based collaboration topologies. 

6""" 

7 

8from __future__ import annotations 

9 

10import asyncio 

11import time 

12from dataclasses import dataclass, field 

13from enum import Enum 

14from typing import Any, Callable, Optional 

15 

16from agentos.swarm.coordinator import SwarmCoordinator 

17 

18 

19class Topology(Enum): 

20 """Swarm collaboration topology.""" 

21 

22 BROADCAST = "broadcast" 

23 """One-to-all: leader broadcasts task, all members respond independently.""" 

24 

25 PIPELINE = "pipeline" 

26 """Sequential chain: each member processes output of previous member.""" 

27 

28 HIERARCHICAL = "hierarchical" 

29 """Tree structure: leader delegates to sub-leaders who manage sub-teams.""" 

30 

31 CONSENSUS = "consensus" 

32 """Voting: all members vote on outputs, majority wins.""" 

33 

34 ROUND_ROBIN = "round_robin" 

35 """Load-balancing: tasks distributed evenly across members.""" 

36 

37 

38@dataclass 

39class CollaborationConfig: 

40 """Configuration for swarm collaboration.""" 

41 

42 topology: Topology = Topology.BROADCAST 

43 timeout_per_member: float = 60.0 

44 """Max seconds per member invocation.""" 

45 

46 max_parallel: int = 5 

47 """Max concurrent member executions (broadcast/consensus).""" 

48 

49 quorum_ratio: float = 0.5 

50 """Minimum ratio of members needed for consensus (consensus topology).""" 

51 

52 allow_partial_results: bool = True 

53 """Return partial results if some members fail.""" 

54 

55 

56@dataclass 

57class MemberResult: 

58 """Result from a single swarm member.""" 

59 

60 member_id: str 

61 success: bool 

62 output: Any = None 

63 error: Optional[str] = None 

64 latency_ms: float = 0.0 

65 

66 

67@dataclass 

68class CollaborationResult: 

69 """Aggregated result from a swarm collaboration.""" 

70 

71 topology: Topology 

72 member_results: list[MemberResult] 

73 aggregated_output: Any = None 

74 total_latency_ms: float = 0.0 

75 success_count: int = 0 

76 failure_count: int = 0 

77 

78 

79class SwarmPatterns: 

80 """ 

81 Higher-order swarm collaboration patterns built on SwarmCoordinator. 

82 

83 Supports five topologies: broadcast, pipeline, hierarchical, consensus, round_robin. 

84 """ 

85 

86 def __init__( 

87 self, 

88 coordinator: SwarmCoordinator, 

89 config: Optional[CollaborationConfig] = None, 

90 ): 

91 self._coordinator = coordinator 

92 self._config = config or CollaborationConfig() 

93 

94 # ---- Synchronous API ---- 

95 

96 def collaborate( 

97 self, 

98 task: str, 

99 context: Optional[dict[str, Any]] = None, 

100 ) -> CollaborationResult: 

101 """ 

102 Execute collaboration using configured topology. 

103 

104 Args: 

105 task: The task description to collaborate on. 

106 context: Optional context dict passed to all members. 

107 

108 Returns: 

109 CollaborationResult with individual and aggregated outputs. 

110 """ 

111 t0 = time.perf_counter() 

112 topology = self._config.topology 

113 

114 dispatchers: dict[Topology, Callable] = { 

115 Topology.BROADCAST: self._broadcast, 

116 Topology.PIPELINE: self._pipeline, 

117 Topology.HIERARCHICAL: self._hierarchical, 

118 Topology.CONSENSUS: self._consensus, 

119 Topology.ROUND_ROBIN: self._round_robin, 

120 } 

121 

122 handler = dispatchers.get(topology) 

123 if handler is None: 

124 raise ValueError(f"Unknown topology: {topology}") 

125 

126 result = handler(task, context) 

127 result.total_latency_ms = (time.perf_counter() - t0) * 1000 

128 result.topology = topology 

129 return result 

130 

131 def _broadcast( 

132 self, task: str, context: Optional[dict] = None 

133 ) -> CollaborationResult: 

134 """Broadcast task to all members, collect all responses.""" 

135 members = self._coordinator.list_members() 

136 results: list[MemberResult] = [] 

137 

138 for member in members: 

139 m_result = self._invoke_member(member, task, context) 

140 results.append(m_result) 

141 

142 aggregated = [r.output for r in results if r.success] 

143 success = sum(1 for r in results if r.success) 

144 failure = len(results) - success 

145 

146 return CollaborationResult( 

147 topology=Topology.BROADCAST, 

148 member_results=results, 

149 aggregated_output=aggregated, 

150 success_count=success, 

151 failure_count=failure, 

152 ) 

153 

154 def _pipeline( 

155 self, task: str, context: Optional[dict] = None 

156 ) -> CollaborationResult: 

157 """Sequential pipeline: each member processes previous output.""" 

158 members = self._coordinator.list_members() 

159 results: list[MemberResult] = [] 

160 current_input = task 

161 

162 for member in members: 

163 m_result = self._invoke_member( 

164 member, current_input, context 

165 ) 

166 results.append(m_result) 

167 if m_result.success: 

168 current_input = str(m_result.output) if m_result.output else current_input 

169 elif not self._config.allow_partial_results: 

170 break 

171 

172 success = sum(1 for r in results if r.success) 

173 failure = len(results) - success 

174 

175 return CollaborationResult( 

176 topology=Topology.PIPELINE, 

177 member_results=results, 

178 aggregated_output=current_input, 

179 success_count=success, 

180 failure_count=failure, 

181 ) 

182 

183 def _hierarchical( 

184 self, task: str, context: Optional[dict] = None 

185 ) -> CollaborationResult: 

186 """Two-level hierarchy: leader delegates to sub-groups.""" 

187 members = self._coordinator.list_members() 

188 n = len(members) 

189 if n < 2: 

190 # Fallback to broadcast for small swarms 

191 return self._broadcast(task, context) 

192 

193 # Split members: first half as sub-leaders, rest as workers 

194 split = max(1, n // 2) 

195 sub_leaders = members[:split] 

196 workers = members[split:] 

197 

198 results: list[MemberResult] = [] 

199 # Step 1: Sub-leaders plan task decomposition 

200 plan_task = f"Decompose this task into sub-tasks for {len(workers)} workers: {task}" 

201 for leader in sub_leaders: 

202 m_result = self._invoke_member(leader, plan_task, context) 

203 results.append(m_result) 

204 

205 # Step 2: Workers execute sub-tasks 

206 sub_tasks = task.split(";") if ";" in task else [task] 

207 for i, worker in enumerate(workers): 

208 sub_task = sub_tasks[i % len(sub_tasks)] 

209 m_result = self._invoke_member(worker, sub_task.strip(), context) 

210 results.append(m_result) 

211 

212 success = sum(1 for r in results if r.success) 

213 failure = len(results) - success 

214 

215 return CollaborationResult( 

216 topology=Topology.HIERARCHICAL, 

217 member_results=results, 

218 aggregated_output=[r.output for r in results if r.success], 

219 success_count=success, 

220 failure_count=failure, 

221 ) 

222 

223 def _consensus( 

224 self, task: str, context: Optional[dict] = None 

225 ) -> CollaborationResult: 

226 """Voting: all members vote, majority output wins.""" 

227 members = self._coordinator.list_members() 

228 results: list[MemberResult] = [] 

229 votes: dict[str, int] = {} 

230 

231 for member in members: 

232 m_result = self._invoke_member(member, task, context) 

233 results.append(m_result) 

234 if m_result.success and m_result.output is not None: 

235 key = str(m_result.output) 

236 votes[key] = votes.get(key, 0) + 1 

237 

238 quorum = max(1, int(len(members) * self._config.quorum_ratio)) 

239 winner = None 

240 for output_key, count in votes.items(): 

241 if count >= quorum: 

242 winner = output_key 

243 break 

244 

245 success = sum(1 for r in results if r.success) 

246 failure = len(results) - success 

247 

248 return CollaborationResult( 

249 topology=Topology.CONSENSUS, 

250 member_results=results, 

251 aggregated_output=winner or "No consensus reached", 

252 success_count=success, 

253 failure_count=failure, 

254 ) 

255 

256 def _round_robin( 

257 self, task: str, context: Optional[dict] = None 

258 ) -> CollaborationResult: 

259 """Load-balancing: pick next available member.""" 

260 members = self._coordinator.list_members() 

261 if not members: 

262 return CollaborationResult( 

263 topology=Topology.ROUND_ROBIN, 

264 member_results=[], 

265 aggregated_output=None, 

266 success_count=0, 

267 failure_count=0, 

268 ) 

269 # Simple: use first available member (full RR needs state) 

270 member = members[0] 

271 m_result = self._invoke_member(member, task, context) 

272 success = 1 if m_result.success else 0 

273 failure = 0 if m_result.success else 1 

274 

275 return CollaborationResult( 

276 topology=Topology.ROUND_ROBIN, 

277 member_results=[m_result], 

278 aggregated_output=m_result.output, 

279 success_count=success, 

280 failure_count=failure, 

281 ) 

282 

283 def _invoke_member( 

284 self, member_id: str, task: str, context: Optional[dict] = None 

285 ) -> MemberResult: 

286 """Invoke a single swarm member with timeout.""" 

287 t0 = time.perf_counter() 

288 try: 

289 output = self._coordinator.delegate( 

290 member_id=member_id, 

291 task=task, 

292 context=context or {}, 

293 ) 

294 latency = (time.perf_counter() - t0) * 1000 

295 return MemberResult( 

296 member_id=member_id, 

297 success=True, 

298 output=output, 

299 latency_ms=latency, 

300 ) 

301 except Exception as exc: 

302 latency = (time.perf_counter() - t0) * 1000 

303 return MemberResult( 

304 member_id=member_id, 

305 success=False, 

306 error=f"{type(exc).__name__}: {exc}", 

307 latency_ms=latency, 

308 ) 

309 

310 # ---- Async API ---- 

311 

312 async def collaborate_async( 

313 self, 

314 task: str, 

315 context: Optional[dict[str, Any]] = None, 

316 ) -> CollaborationResult: 

317 """Async version using asyncio for broadcast/consensus topologies.""" 

318 t0 = time.perf_counter() 

319 members = self._coordinator.list_members() 

320 

321 if self._config.topology == Topology.BROADCAST: 

322 semaphore = asyncio.Semaphore(self._config.max_parallel) 

323 

324 async def run_one(member_id: str) -> MemberResult: 

325 async with semaphore: 

326 return await asyncio.to_thread( 

327 self._invoke_member, member_id, task, context 

328 ) 

329 

330 results = await asyncio.gather( 

331 *[run_one(m) for m in members] 

332 ) 

333 member_results = list(results) 

334 else: 

335 # Other topologies: run sequentially in thread 

336 member_results = await asyncio.to_thread( 

337 self.collaborate, task, context 

338 ) 

339 if isinstance(member_results, CollaborationResult): 

340 member_results = member_results.member_results 

341 

342 success = sum(1 for r in member_results if r.success) 

343 failure = len(member_results) - success 

344 

345 return CollaborationResult( 

346 topology=self._config.topology, 

347 member_results=member_results, 

348 aggregated_output=[r.output for r in member_results if r.success], 

349 total_latency_ms=(time.perf_counter() - t0) * 1000, 

350 success_count=success, 

351 failure_count=failure, 

352 )