Coverage for agentos/subagent/collaboration.py: 34%

205 statements  

« prev     ^ index     » next       coverage.py v7.14.3, created at 2026-07-02 09:59 +0800

1""" 

2Agent 协作模式 — Debate/Vote/Review/Pipeline/Ensemble。 

3基于 SubAgentManager + 父子通信之上,提供高级多Agent协作原语。 

4 

5使用示例:: 

6 

7 mgr = SubAgentManager() 

8 collab = AgentCollaboration(mgr) 

9 

10 result = await collab.debate("Python vs Rust for web backend", agents=2) 

11 result = await collab.vote(["方案A", "方案B", "方案C"], agents=5) 

12 result = await collab.review("写一篇关于AI安全的文章", rounds=2) 

13 result = await collab.pipeline("分析Q2财报数据", stages=3) 

14 result = await collab.ensemble("设计系统架构方案", agents=3) 

15""" 

16 

17from __future__ import annotations 

18 

19import asyncio 

20import re 

21import time 

22import uuid 

23from dataclasses import dataclass, field 

24from enum import Enum 

25from typing import Any, Callable, Awaitable 

26 

27from .manager import SubAgentManager, SubAgentSpec, SubAgentResult 

28from .parent_child import ChildStatus, ChildContext, SharedState, ChildHandle 

29 

30 

31# ────────────────────────────────────────────── 

32# 枚举与数据类型 

33# ────────────────────────────────────────────── 

34 

35 

36class CollaborationMode(str, Enum): 

37 DEBATE = "debate" 

38 VOTE = "vote" 

39 REVIEW = "review" 

40 PIPELINE = "pipeline" 

41 ENSEMBLE = "ensemble" 

42 

43 

44class VoteStrategy(str, Enum): 

45 MAJORITY = "majority" 

46 WEIGHTED = "weighted" 

47 RANKED = "ranked" 

48 UNANIMOUS = "unanimous" 

49 

50 

51@dataclass 

52class DebateRound: 

53 """一轮辩论。""" 

54 round: int 

55 arguments: list[str] # 各方论点 

56 rebuttals: list[str] # 反驳 

57 winner: int | None = None 

58 

59 

60@dataclass 

61class VoteBallot: 

62 """一张选票。""" 

63 agent_id: str 

64 choice: str 

65 confidence: float = 1.0 

66 reasoning: str = "" 

67 

68 

69@dataclass 

70class ReviewPass: 

71 """一轮审查。""" 

72 round: int 

73 draft: str 

74 feedback: str 

75 revised: str 

76 score: float = 0.0 

77 

78 

79@dataclass 

80class CollaborationResult: 

81 """协作结果。""" 

82 mode: CollaborationMode 

83 agents: list[str] 

84 rounds: int 

85 final_output: str 

86 intermediate: list[Any] = field(default_factory=list) 

87 consensus: float = 0.0 

88 duration: float = 0.0 

89 

90 

91# ────────────────────────────────────────────── 

92# 角色性格库 

93# ────────────────────────────────────────────── 

94 

95_PERSONAS = [ 

96 "Be logical, data-driven, and cite evidence.", 

97 "Be creative, think outside the box, challenge assumptions.", 

98 "Focus on practical concerns: cost, timeline, feasibility.", 

99 "Advocate for user experience and human-centered design.", 

100 "Take a contrarian stance, find flaws in all arguments.", 

101] 

102 

103# ────────────────────────────────────────────── 

104# 核心协作引擎 

105# ────────────────────────────────────────────── 

106 

107 

108class AgentCollaboration: 

109 """多Agent协作引擎。 

110 

111 参数: 

112 manager: SubAgentManager 实例 

113 run_func: 执行函数 (task_str, ctx) -> (output, iterations) 

114 default_timeout: 每个子Agent默认超时(秒) 

115 """ 

116 

117 def __init__( 

118 self, 

119 manager: SubAgentManager | None = None, 

120 run_func: Callable[[SubAgentSpec, ChildContext], Awaitable[tuple[str, int]]] | None = None, 

121 default_timeout: float | None = 300.0, 

122 ): 

123 self._mgr = manager or SubAgentManager() 

124 self._run = run_func 

125 self._timeout = default_timeout 

126 

127 # ── 便捷属性 ──────────────────────────── 

128 

129 @property 

130 def manager(self) -> SubAgentManager: 

131 return self._mgr 

132 

133 @property 

134 def shared_state(self) -> SharedState: 

135 return self._mgr.shared_state 

136 

137 async def cancel_all(self) -> None: 

138 await self._mgr.cancel_all() 

139 

140 @property 

141 def active_agents(self) -> int: 

142 return self._mgr.active_children 

143 

144 # ── 内部辅助 ──────────────────────────── 

145 

146 async def _spawn(self, task: str, timeout: float | None = None) -> SubAgentResult: 

147 """Fork 一个单Agent执行 task。""" 

148 return await self._mgr.spawn_fork( 

149 task=task, 

150 run_func=self._run, 

151 timeout=timeout or self._timeout, 

152 ) 

153 

154 async def _spawn_many(self, tasks: list[str], timeout: float | None = None) -> list[SubAgentResult]: 

155 """Swarm 并行执行多个 task。""" 

156 return await self._mgr.spawn_swarm( 

157 tasks=tasks, 

158 run_func=self._run, 

159 timeout=timeout or self._timeout, 

160 ) 

161 

162 @staticmethod 

163 def _parse_score(output: str) -> float: 

164 """从输出中解析 SCORE: N 格式的评分。""" 

165 m = re.search(r'SCORE:\s*([\d.]+)', output, re.IGNORECASE) 

166 if m: 

167 return max(0.0, min(10.0, float(m.group(1)))) / 10.0 

168 return 0.7 

169 

170 @staticmethod 

171 def _parse_choice( 

172 output: str, option_count: int 

173 ) -> tuple[int | None, float, str]: 

174 """解析 CHOICE: N | CONFIDENCE: X | REASONING: text。""" 

175 choice = None 

176 confidence = 0.5 

177 reasoning = output[:200] 

178 

179 m_choice = re.search(r'CHOICE:\s*(\d+)', output, re.IGNORECASE) 

180 if m_choice: 

181 num = int(m_choice.group(1)) 

182 if 1 <= num <= option_count: 

183 choice = num 

184 

185 m_conf = re.search(r'CONFIDENCE:\s*([\d.]+)', output, re.IGNORECASE) 

186 if m_conf: 

187 confidence = max(0.0, min(1.0, float(m_conf.group(1)))) 

188 

189 m_reason = re.search(r'REASONING:\s*(.+?)(?:\n|$)', output, re.IGNORECASE | re.DOTALL) 

190 if m_reason: 

191 reasoning = m_reason.group(1).strip()[:200] 

192 

193 return choice, confidence, reasoning 

194 

195 # ══════════════════════════════════════════ 

196 # 1. Debate 辩论模式 

197 # ══════════════════════════════════════════ 

198 

199 async def debate( 

200 self, 

201 topic: str, 

202 agents: int = 2, 

203 rounds: int = 3, 

204 timeout: float | None = None, 

205 ) -> CollaborationResult: 

206 """多个Agent辩论 topic,裁判总结。 

207 

208 流程: 

209 Round 0: 各方发表初始论点 

210 Round 1~N: 反驳对方 + 强化己方 

211 裁判: 综合所有论点给出最终裁决 

212 """ 

213 t0 = time.time() 

214 history: list[DebateRound] = [] 

215 all_agent_ids: list[str] = [] 

216 

217 # Round 0 — 初始论点 

218 r0_tasks = [ 

219 f"Debate topic: {topic}\n" 

220 f"You are debater {chr(65+i)}. {_PERSONAS[i % len(_PERSONAS)]}\n" 

221 f"Present your opening argument." 

222 for i in range(agents) 

223 ] 

224 r0_results = await self._spawn_many(r0_tasks, timeout) 

225 r0_args = [r.output for r in r0_results] 

226 history.append(DebateRound(round=0, arguments=r0_args, rebuttals=[])) 

227 all_agent_ids.extend(r.agent_id for r in r0_results) 

228 

229 # Round 1..N — 反驳强化 

230 for rnd in range(1, rounds): 

231 rebut_tasks = [] 

232 for i in range(agents): 

233 opponent_args = [ 

234 history[-1].arguments[j] 

235 for j in range(agents) if j != i 

236 ] 

237 rebut_tasks.append( 

238 f"Debate topic: {topic}\n" 

239 f"You are debater {chr(65+i)}. {_PERSONAS[i % len(_PERSONAS)]}\n" 

240 f"Opponent arguments: {' | '.join(opponent_args)}\n" 

241 f"Provide your rebuttal and strengthen your position." 

242 ) 

243 rebut_results = await self._spawn_many(rebut_tasks, timeout) 

244 rebuttals = [r.output for r in rebut_results] 

245 history.append(DebateRound( 

246 round=rnd, 

247 arguments=[history[-1].arguments[i] for i in range(agents)], 

248 rebuttals=rebuttals, 

249 )) 

250 all_agent_ids.extend(r.agent_id for r in rebut_results) 

251 

252 # 裁判总结 

253 all_args = "\n\n".join([ 

254 f"Debater {chr(65+i)} initial: {history[0].arguments[i]}\n" 

255 f"Debater {chr(65+i)} final rebuttal: " 

256 f"{history[-1].rebuttals[i] if i < len(history[-1].rebuttals) else 'N/A'}" 

257 for i in range(agents) 

258 ]) 

259 judge = await self._spawn( 

260 f"As an impartial judge, synthesize this debate on '{topic}' and give your verdict:\n" 

261 f"{all_args}", 

262 timeout, 

263 ) 

264 all_agent_ids.append(judge.agent_id) 

265 

266 return CollaborationResult( 

267 mode=CollaborationMode.DEBATE, 

268 agents=all_agent_ids, 

269 rounds=rounds + 1, 

270 final_output=judge.output, 

271 intermediate=history, 

272 consensus=0.5 + 0.1 * min(agents, 5), 

273 duration=time.time() - t0, 

274 ) 

275 

276 # ══════════════════════════════════════════ 

277 # 2. Vote 投票模式 

278 # ══════════════════════════════════════════ 

279 

280 async def vote( 

281 self, 

282 options: list[str], 

283 agents: int = 3, 

284 strategy: VoteStrategy = VoteStrategy.MAJORITY, 

285 timeout: float | None = None, 

286 ) -> CollaborationResult: 

287 """多个Agent投票选择最优方案。 

288 

289 参数: 

290 options: 候选选项列表 

291 agents: 投票Agent数 

292 strategy: 统计策略 

293 """ 

294 t0 = time.time() 

295 option_list = "\n".join(f"{i+1}. {opt}" for i, opt in enumerate(options)) 

296 ballots: list[VoteBallot] = [] 

297 

298 vote_tasks = [ 

299 f"You are voter {i+1}/{agents}. {_PERSONAS[i % len(_PERSONAS)]}\n" 

300 f"Evaluate these options and vote for ONE:\n{option_list}\n" 

301 f"Format: CHOICE: <number> | CONFIDENCE: <0.0-1.0> | REASONING: <text>" 

302 for i in range(agents) 

303 ] 

304 results = await self._spawn_many(vote_tasks, timeout) 

305 

306 for r in results: 

307 c, conf, reason = self._parse_choice(r.output, len(options)) 

308 if c is not None: 

309 ballots.append(VoteBallot( 

310 agent_id=r.agent_id, 

311 choice=options[c - 1], 

312 confidence=conf, 

313 reasoning=reason, 

314 )) 

315 

316 _tally: dict[str, int] = {} 

317 _weighted: dict[str, float] = {} 

318 for b in ballots: 

319 _tally[b.choice] = _tally.get(b.choice, 0) + 1 

320 _weighted[b.choice] = _weighted.get(b.choice, 0) + b.confidence 

321 

322 if not ballots: 

323 return CollaborationResult( 

324 mode=CollaborationMode.VOTE, 

325 agents=[], 

326 rounds=1, 

327 final_output="No valid votes cast.", 

328 consensus=0.0, 

329 duration=time.time() - t0, 

330 ) 

331 

332 if strategy == VoteStrategy.WEIGHTED: 

333 winner = max(_weighted, key=_weighted.get) 

334 consensus = _weighted[winner] / sum(_weighted.values()) 

335 summary = f"Weighted winner: {winner} (score: {_weighted[winner]:.2f})" 

336 elif strategy == VoteStrategy.UNANIMOUS: 

337 if len(_tally) == 1 and list(_tally.values())[0] == agents: 

338 winner, consensus = list(_tally.keys())[0], 1.0 

339 summary = f"Unanimous: {winner}" 

340 else: 

341 winner, consensus = "NO CONSENSUS", 0.0 

342 summary = "Unanimous vote FAILED" 

343 else: 

344 winner = max(_tally, key=_tally.get) 

345 consensus = _tally[winner] / len(ballots) 

346 summary = f"Majority winner: {winner} ({_tally[winner]}/{len(ballots)} votes)" 

347 

348 report = f"{summary}\n\nVote details:\n" 

349 for b in ballots: 

350 report += f"- [{b.agent_id}] '{b.choice}' (conf={b.confidence:.2f}): {b.reasoning}\n" 

351 

352 return CollaborationResult( 

353 mode=CollaborationMode.VOTE, 

354 agents=[b.agent_id for b in ballots], 

355 rounds=1, 

356 final_output=report, 

357 intermediate=ballots, 

358 consensus=consensus, 

359 duration=time.time() - t0, 

360 ) 

361 

362 # ══════════════════════════════════════════ 

363 # 3. Review 审查模式 

364 # ══════════════════════════════════════════ 

365 

366 async def review( 

367 self, 

368 task: str, 

369 rounds: int = 2, 

370 timeout: float | None = None, 

371 ) -> CollaborationResult: 

372 """Writer产出 → Reviewer审查 → 多轮迭代。 

373 

374 流程: 

375 1. Writer 生成初稿 

376 2. Reviewer 审查 → 打分 + 反馈 

377 3. Writer 根据反馈修改 

378 4. 重复 rounds 次 

379 """ 

380 t0 = time.time() 

381 passes: list[ReviewPass] = [] 

382 writer_id = uuid.uuid4().hex[:8] 

383 reviewer_id = uuid.uuid4().hex[:8] 

384 

385 draft_result = await self._spawn( 

386 f"As a writer, complete: {task}", timeout, 

387 ) 

388 draft = draft_result.output 

389 

390 for rnd in range(rounds): 

391 review_result = await self._spawn( 

392 f"As a reviewer, evaluate this draft (round {rnd+1}):\n{draft}\n" 

393 f"Provide specific feedback. Format: SCORE: <0-10> | FEEDBACK: <text>", 

394 timeout, 

395 ) 

396 feedback = review_result.output 

397 score = self._parse_score(feedback) 

398 

399 revise_result = await self._spawn( 

400 f"As a writer, revise your draft based on this feedback:\n{feedback}\n\n" 

401 f"Original draft:\n{draft}\n\nProvide the revised version.", 

402 timeout, 

403 ) 

404 revised = revise_result.output 

405 

406 passes.append(ReviewPass( 

407 round=rnd + 1, 

408 draft=draft, 

409 feedback=feedback, 

410 revised=revised, 

411 score=score, 

412 )) 

413 draft = revised 

414 

415 return CollaborationResult( 

416 mode=CollaborationMode.REVIEW, 

417 agents=[writer_id, reviewer_id], 

418 rounds=rounds, 

419 final_output=draft, 

420 intermediate=passes, 

421 consensus=passes[-1].score if passes else 0.0, 

422 duration=time.time() - t0, 

423 ) 

424 

425 # ══════════════════════════════════════════ 

426 # 4. Pipeline 流水线模式 

427 # ══════════════════════════════════════════ 

428 

429 async def pipeline( 

430 self, 

431 task: str, 

432 stages: int = 3, 

433 stage_names: list[str] | None = None, 

434 timeout: float | None = None, 

435 ) -> CollaborationResult: 

436 """多Agent串联处理,前一输出是后一输入。 

437 

438 参数: 

439 task: 初始输入 

440 stages: 流水线段数 

441 stage_names: 自定义阶段名,默认 ['Analyzer','Processor','Refiner',...] 

442 """ 

443 t0 = time.time() 

444 if stage_names is None: 

445 stage_names = ["Analyzer", "Processor", "Refiner", "Polisher", "Validator"][:stages] 

446 

447 intermediates: list[str] = [] 

448 agent_ids: list[str] = [] 

449 current_input = task 

450 

451 for name in stage_names: 

452 result = await self._spawn( 

453 f"You are the {name} stage in a processing pipeline.\n" 

454 f"Input: {current_input}\nProcess and output for the next stage.", 

455 timeout, 

456 ) 

457 current_input = result.output 

458 intermediates.append(current_input) 

459 agent_ids.append(result.agent_id) 

460 

461 return CollaborationResult( 

462 mode=CollaborationMode.PIPELINE, 

463 agents=agent_ids, 

464 rounds=stages, 

465 final_output=current_input, 

466 intermediate=intermediates, 

467 consensus=1.0, 

468 duration=time.time() - t0, 

469 ) 

470 

471 # ══════════════════════════════════════════ 

472 # 5. Ensemble 集成模式 

473 # ══════════════════════════════════════════ 

474 

475 async def ensemble( 

476 self, 

477 task: str, 

478 agents: int = 3, 

479 merge_strategy: str = "best_of", 

480 timeout: float | None = None, 

481 ) -> CollaborationResult: 

482 """多个Agent独立求解,合并最优。 

483 

484 参数: 

485 task: 需求描述 

486 agents: 求解Agent数 

487 merge_strategy: 'best_of' | 'merge' | 'weighted' 

488 """ 

489 t0 = time.time() 

490 

491 tasks = [ 

492 f"You are solver {i+1}/{agents}. {_PERSONAS[i % len(_PERSONAS)]}\n" 

493 f"Complete: {task}\nAt the end self-evaluate: SELF_SCORE: <0-10>" 

494 for i in range(agents) 

495 ] 

496 results = await self._spawn_many(tasks, timeout) 

497 

498 scored: list[tuple[float, str, str]] = [] 

499 for r in results: 

500 s = self._parse_score(r.output) * 10 # 0-10 

501 scored.append((s, r.output, r.agent_id)) 

502 scored.sort(reverse=True, key=lambda x: x[0]) 

503 

504 if not scored: 

505 return CollaborationResult( 

506 mode=CollaborationMode.ENSEMBLE, 

507 agents=[], rounds=1, 

508 final_output="No results.", 

509 duration=time.time() - t0, 

510 ) 

511 

512 if merge_strategy == "best_of": 

513 bs, bo, ba = scored[0] 

514 final = f"Best solution (score: {bs:.1f}/10) from [{ba}]:\n\n{bo}" 

515 elif merge_strategy == "merge": 

516 parts = [] 

517 for i, (s, o, a) in enumerate(scored): 

518 parts.append(f"[Solution {i+1}, score={s:.1f}]:\n{o[:500]}") 

519 merge_result = await self._spawn( 

520 f"As a meta-synthesizer, merge these {agents} solutions:\n\n" 

521 + "\n---\n".join(parts), 

522 timeout, 

523 ) 

524 final = merge_result.output 

525 else: # weighted 

526 total = sum(s for s, _, _ in scored) 

527 weights = [(s / total) if total else 0 for s, _, _ in scored] 

528 parts = [] 

529 for (s, o, a), w in zip(scored, weights): 

530 parts.append(f"[{a}] weight={w:.2f}:\n{o[:300]}") 

531 final = f"Weighted ensemble ({agents} solvers):\n\n" + "\n---\n".join(parts) 

532 

533 return CollaborationResult( 

534 mode=CollaborationMode.ENSEMBLE, 

535 agents=[a for _, _, a in scored], 

536 rounds=1, 

537 final_output=final, 

538 intermediate=scored, 

539 consensus=scored[0][0] / 10.0, 

540 duration=time.time() - t0, 

541 )