Coverage for agentos/subagent/collaboration.py: 34%
205 statements
« prev ^ index » next coverage.py v7.14.3, created at 2026-07-02 09:59 +0800
« prev ^ index » next coverage.py v7.14.3, created at 2026-07-02 09:59 +0800
1"""
2Agent 协作模式 — Debate/Vote/Review/Pipeline/Ensemble。
3基于 SubAgentManager + 父子通信之上,提供高级多Agent协作原语。
5使用示例::
7 mgr = SubAgentManager()
8 collab = AgentCollaboration(mgr)
10 result = await collab.debate("Python vs Rust for web backend", agents=2)
11 result = await collab.vote(["方案A", "方案B", "方案C"], agents=5)
12 result = await collab.review("写一篇关于AI安全的文章", rounds=2)
13 result = await collab.pipeline("分析Q2财报数据", stages=3)
14 result = await collab.ensemble("设计系统架构方案", agents=3)
15"""
17from __future__ import annotations
19import asyncio
20import re
21import time
22import uuid
23from dataclasses import dataclass, field
24from enum import Enum
25from typing import Any, Callable, Awaitable
27from .manager import SubAgentManager, SubAgentSpec, SubAgentResult
28from .parent_child import ChildStatus, ChildContext, SharedState, ChildHandle
31# ──────────────────────────────────────────────
32# 枚举与数据类型
33# ──────────────────────────────────────────────
36class CollaborationMode(str, Enum):
37 DEBATE = "debate"
38 VOTE = "vote"
39 REVIEW = "review"
40 PIPELINE = "pipeline"
41 ENSEMBLE = "ensemble"
44class VoteStrategy(str, Enum):
45 MAJORITY = "majority"
46 WEIGHTED = "weighted"
47 RANKED = "ranked"
48 UNANIMOUS = "unanimous"
51@dataclass
52class DebateRound:
53 """一轮辩论。"""
54 round: int
55 arguments: list[str] # 各方论点
56 rebuttals: list[str] # 反驳
57 winner: int | None = None
60@dataclass
61class VoteBallot:
62 """一张选票。"""
63 agent_id: str
64 choice: str
65 confidence: float = 1.0
66 reasoning: str = ""
69@dataclass
70class ReviewPass:
71 """一轮审查。"""
72 round: int
73 draft: str
74 feedback: str
75 revised: str
76 score: float = 0.0
79@dataclass
80class CollaborationResult:
81 """协作结果。"""
82 mode: CollaborationMode
83 agents: list[str]
84 rounds: int
85 final_output: str
86 intermediate: list[Any] = field(default_factory=list)
87 consensus: float = 0.0
88 duration: float = 0.0
91# ──────────────────────────────────────────────
92# 角色性格库
93# ──────────────────────────────────────────────
95_PERSONAS = [
96 "Be logical, data-driven, and cite evidence.",
97 "Be creative, think outside the box, challenge assumptions.",
98 "Focus on practical concerns: cost, timeline, feasibility.",
99 "Advocate for user experience and human-centered design.",
100 "Take a contrarian stance, find flaws in all arguments.",
101]
103# ──────────────────────────────────────────────
104# 核心协作引擎
105# ──────────────────────────────────────────────
108class AgentCollaboration:
109 """多Agent协作引擎。
111 参数:
112 manager: SubAgentManager 实例
113 run_func: 执行函数 (task_str, ctx) -> (output, iterations)
114 default_timeout: 每个子Agent默认超时(秒)
115 """
117 def __init__(
118 self,
119 manager: SubAgentManager | None = None,
120 run_func: Callable[[SubAgentSpec, ChildContext], Awaitable[tuple[str, int]]] | None = None,
121 default_timeout: float | None = 300.0,
122 ):
123 self._mgr = manager or SubAgentManager()
124 self._run = run_func
125 self._timeout = default_timeout
127 # ── 便捷属性 ────────────────────────────
129 @property
130 def manager(self) -> SubAgentManager:
131 return self._mgr
133 @property
134 def shared_state(self) -> SharedState:
135 return self._mgr.shared_state
137 async def cancel_all(self) -> None:
138 await self._mgr.cancel_all()
140 @property
141 def active_agents(self) -> int:
142 return self._mgr.active_children
144 # ── 内部辅助 ────────────────────────────
146 async def _spawn(self, task: str, timeout: float | None = None) -> SubAgentResult:
147 """Fork 一个单Agent执行 task。"""
148 return await self._mgr.spawn_fork(
149 task=task,
150 run_func=self._run,
151 timeout=timeout or self._timeout,
152 )
154 async def _spawn_many(self, tasks: list[str], timeout: float | None = None) -> list[SubAgentResult]:
155 """Swarm 并行执行多个 task。"""
156 return await self._mgr.spawn_swarm(
157 tasks=tasks,
158 run_func=self._run,
159 timeout=timeout or self._timeout,
160 )
162 @staticmethod
163 def _parse_score(output: str) -> float:
164 """从输出中解析 SCORE: N 格式的评分。"""
165 m = re.search(r'SCORE:\s*([\d.]+)', output, re.IGNORECASE)
166 if m:
167 return max(0.0, min(10.0, float(m.group(1)))) / 10.0
168 return 0.7
170 @staticmethod
171 def _parse_choice(
172 output: str, option_count: int
173 ) -> tuple[int | None, float, str]:
174 """解析 CHOICE: N | CONFIDENCE: X | REASONING: text。"""
175 choice = None
176 confidence = 0.5
177 reasoning = output[:200]
179 m_choice = re.search(r'CHOICE:\s*(\d+)', output, re.IGNORECASE)
180 if m_choice:
181 num = int(m_choice.group(1))
182 if 1 <= num <= option_count:
183 choice = num
185 m_conf = re.search(r'CONFIDENCE:\s*([\d.]+)', output, re.IGNORECASE)
186 if m_conf:
187 confidence = max(0.0, min(1.0, float(m_conf.group(1))))
189 m_reason = re.search(r'REASONING:\s*(.+?)(?:\n|$)', output, re.IGNORECASE | re.DOTALL)
190 if m_reason:
191 reasoning = m_reason.group(1).strip()[:200]
193 return choice, confidence, reasoning
195 # ══════════════════════════════════════════
196 # 1. Debate 辩论模式
197 # ══════════════════════════════════════════
199 async def debate(
200 self,
201 topic: str,
202 agents: int = 2,
203 rounds: int = 3,
204 timeout: float | None = None,
205 ) -> CollaborationResult:
206 """多个Agent辩论 topic,裁判总结。
208 流程:
209 Round 0: 各方发表初始论点
210 Round 1~N: 反驳对方 + 强化己方
211 裁判: 综合所有论点给出最终裁决
212 """
213 t0 = time.time()
214 history: list[DebateRound] = []
215 all_agent_ids: list[str] = []
217 # Round 0 — 初始论点
218 r0_tasks = [
219 f"Debate topic: {topic}\n"
220 f"You are debater {chr(65+i)}. {_PERSONAS[i % len(_PERSONAS)]}\n"
221 f"Present your opening argument."
222 for i in range(agents)
223 ]
224 r0_results = await self._spawn_many(r0_tasks, timeout)
225 r0_args = [r.output for r in r0_results]
226 history.append(DebateRound(round=0, arguments=r0_args, rebuttals=[]))
227 all_agent_ids.extend(r.agent_id for r in r0_results)
229 # Round 1..N — 反驳强化
230 for rnd in range(1, rounds):
231 rebut_tasks = []
232 for i in range(agents):
233 opponent_args = [
234 history[-1].arguments[j]
235 for j in range(agents) if j != i
236 ]
237 rebut_tasks.append(
238 f"Debate topic: {topic}\n"
239 f"You are debater {chr(65+i)}. {_PERSONAS[i % len(_PERSONAS)]}\n"
240 f"Opponent arguments: {' | '.join(opponent_args)}\n"
241 f"Provide your rebuttal and strengthen your position."
242 )
243 rebut_results = await self._spawn_many(rebut_tasks, timeout)
244 rebuttals = [r.output for r in rebut_results]
245 history.append(DebateRound(
246 round=rnd,
247 arguments=[history[-1].arguments[i] for i in range(agents)],
248 rebuttals=rebuttals,
249 ))
250 all_agent_ids.extend(r.agent_id for r in rebut_results)
252 # 裁判总结
253 all_args = "\n\n".join([
254 f"Debater {chr(65+i)} initial: {history[0].arguments[i]}\n"
255 f"Debater {chr(65+i)} final rebuttal: "
256 f"{history[-1].rebuttals[i] if i < len(history[-1].rebuttals) else 'N/A'}"
257 for i in range(agents)
258 ])
259 judge = await self._spawn(
260 f"As an impartial judge, synthesize this debate on '{topic}' and give your verdict:\n"
261 f"{all_args}",
262 timeout,
263 )
264 all_agent_ids.append(judge.agent_id)
266 return CollaborationResult(
267 mode=CollaborationMode.DEBATE,
268 agents=all_agent_ids,
269 rounds=rounds + 1,
270 final_output=judge.output,
271 intermediate=history,
272 consensus=0.5 + 0.1 * min(agents, 5),
273 duration=time.time() - t0,
274 )
276 # ══════════════════════════════════════════
277 # 2. Vote 投票模式
278 # ══════════════════════════════════════════
280 async def vote(
281 self,
282 options: list[str],
283 agents: int = 3,
284 strategy: VoteStrategy = VoteStrategy.MAJORITY,
285 timeout: float | None = None,
286 ) -> CollaborationResult:
287 """多个Agent投票选择最优方案。
289 参数:
290 options: 候选选项列表
291 agents: 投票Agent数
292 strategy: 统计策略
293 """
294 t0 = time.time()
295 option_list = "\n".join(f"{i+1}. {opt}" for i, opt in enumerate(options))
296 ballots: list[VoteBallot] = []
298 vote_tasks = [
299 f"You are voter {i+1}/{agents}. {_PERSONAS[i % len(_PERSONAS)]}\n"
300 f"Evaluate these options and vote for ONE:\n{option_list}\n"
301 f"Format: CHOICE: <number> | CONFIDENCE: <0.0-1.0> | REASONING: <text>"
302 for i in range(agents)
303 ]
304 results = await self._spawn_many(vote_tasks, timeout)
306 for r in results:
307 c, conf, reason = self._parse_choice(r.output, len(options))
308 if c is not None:
309 ballots.append(VoteBallot(
310 agent_id=r.agent_id,
311 choice=options[c - 1],
312 confidence=conf,
313 reasoning=reason,
314 ))
316 _tally: dict[str, int] = {}
317 _weighted: dict[str, float] = {}
318 for b in ballots:
319 _tally[b.choice] = _tally.get(b.choice, 0) + 1
320 _weighted[b.choice] = _weighted.get(b.choice, 0) + b.confidence
322 if not ballots:
323 return CollaborationResult(
324 mode=CollaborationMode.VOTE,
325 agents=[],
326 rounds=1,
327 final_output="No valid votes cast.",
328 consensus=0.0,
329 duration=time.time() - t0,
330 )
332 if strategy == VoteStrategy.WEIGHTED:
333 winner = max(_weighted, key=_weighted.get)
334 consensus = _weighted[winner] / sum(_weighted.values())
335 summary = f"Weighted winner: {winner} (score: {_weighted[winner]:.2f})"
336 elif strategy == VoteStrategy.UNANIMOUS:
337 if len(_tally) == 1 and list(_tally.values())[0] == agents:
338 winner, consensus = list(_tally.keys())[0], 1.0
339 summary = f"Unanimous: {winner}"
340 else:
341 winner, consensus = "NO CONSENSUS", 0.0
342 summary = "Unanimous vote FAILED"
343 else:
344 winner = max(_tally, key=_tally.get)
345 consensus = _tally[winner] / len(ballots)
346 summary = f"Majority winner: {winner} ({_tally[winner]}/{len(ballots)} votes)"
348 report = f"{summary}\n\nVote details:\n"
349 for b in ballots:
350 report += f"- [{b.agent_id}] '{b.choice}' (conf={b.confidence:.2f}): {b.reasoning}\n"
352 return CollaborationResult(
353 mode=CollaborationMode.VOTE,
354 agents=[b.agent_id for b in ballots],
355 rounds=1,
356 final_output=report,
357 intermediate=ballots,
358 consensus=consensus,
359 duration=time.time() - t0,
360 )
362 # ══════════════════════════════════════════
363 # 3. Review 审查模式
364 # ══════════════════════════════════════════
366 async def review(
367 self,
368 task: str,
369 rounds: int = 2,
370 timeout: float | None = None,
371 ) -> CollaborationResult:
372 """Writer产出 → Reviewer审查 → 多轮迭代。
374 流程:
375 1. Writer 生成初稿
376 2. Reviewer 审查 → 打分 + 反馈
377 3. Writer 根据反馈修改
378 4. 重复 rounds 次
379 """
380 t0 = time.time()
381 passes: list[ReviewPass] = []
382 writer_id = uuid.uuid4().hex[:8]
383 reviewer_id = uuid.uuid4().hex[:8]
385 draft_result = await self._spawn(
386 f"As a writer, complete: {task}", timeout,
387 )
388 draft = draft_result.output
390 for rnd in range(rounds):
391 review_result = await self._spawn(
392 f"As a reviewer, evaluate this draft (round {rnd+1}):\n{draft}\n"
393 f"Provide specific feedback. Format: SCORE: <0-10> | FEEDBACK: <text>",
394 timeout,
395 )
396 feedback = review_result.output
397 score = self._parse_score(feedback)
399 revise_result = await self._spawn(
400 f"As a writer, revise your draft based on this feedback:\n{feedback}\n\n"
401 f"Original draft:\n{draft}\n\nProvide the revised version.",
402 timeout,
403 )
404 revised = revise_result.output
406 passes.append(ReviewPass(
407 round=rnd + 1,
408 draft=draft,
409 feedback=feedback,
410 revised=revised,
411 score=score,
412 ))
413 draft = revised
415 return CollaborationResult(
416 mode=CollaborationMode.REVIEW,
417 agents=[writer_id, reviewer_id],
418 rounds=rounds,
419 final_output=draft,
420 intermediate=passes,
421 consensus=passes[-1].score if passes else 0.0,
422 duration=time.time() - t0,
423 )
425 # ══════════════════════════════════════════
426 # 4. Pipeline 流水线模式
427 # ══════════════════════════════════════════
429 async def pipeline(
430 self,
431 task: str,
432 stages: int = 3,
433 stage_names: list[str] | None = None,
434 timeout: float | None = None,
435 ) -> CollaborationResult:
436 """多Agent串联处理,前一输出是后一输入。
438 参数:
439 task: 初始输入
440 stages: 流水线段数
441 stage_names: 自定义阶段名,默认 ['Analyzer','Processor','Refiner',...]
442 """
443 t0 = time.time()
444 if stage_names is None:
445 stage_names = ["Analyzer", "Processor", "Refiner", "Polisher", "Validator"][:stages]
447 intermediates: list[str] = []
448 agent_ids: list[str] = []
449 current_input = task
451 for name in stage_names:
452 result = await self._spawn(
453 f"You are the {name} stage in a processing pipeline.\n"
454 f"Input: {current_input}\nProcess and output for the next stage.",
455 timeout,
456 )
457 current_input = result.output
458 intermediates.append(current_input)
459 agent_ids.append(result.agent_id)
461 return CollaborationResult(
462 mode=CollaborationMode.PIPELINE,
463 agents=agent_ids,
464 rounds=stages,
465 final_output=current_input,
466 intermediate=intermediates,
467 consensus=1.0,
468 duration=time.time() - t0,
469 )
471 # ══════════════════════════════════════════
472 # 5. Ensemble 集成模式
473 # ══════════════════════════════════════════
475 async def ensemble(
476 self,
477 task: str,
478 agents: int = 3,
479 merge_strategy: str = "best_of",
480 timeout: float | None = None,
481 ) -> CollaborationResult:
482 """多个Agent独立求解,合并最优。
484 参数:
485 task: 需求描述
486 agents: 求解Agent数
487 merge_strategy: 'best_of' | 'merge' | 'weighted'
488 """
489 t0 = time.time()
491 tasks = [
492 f"You are solver {i+1}/{agents}. {_PERSONAS[i % len(_PERSONAS)]}\n"
493 f"Complete: {task}\nAt the end self-evaluate: SELF_SCORE: <0-10>"
494 for i in range(agents)
495 ]
496 results = await self._spawn_many(tasks, timeout)
498 scored: list[tuple[float, str, str]] = []
499 for r in results:
500 s = self._parse_score(r.output) * 10 # 0-10
501 scored.append((s, r.output, r.agent_id))
502 scored.sort(reverse=True, key=lambda x: x[0])
504 if not scored:
505 return CollaborationResult(
506 mode=CollaborationMode.ENSEMBLE,
507 agents=[], rounds=1,
508 final_output="No results.",
509 duration=time.time() - t0,
510 )
512 if merge_strategy == "best_of":
513 bs, bo, ba = scored[0]
514 final = f"Best solution (score: {bs:.1f}/10) from [{ba}]:\n\n{bo}"
515 elif merge_strategy == "merge":
516 parts = []
517 for i, (s, o, a) in enumerate(scored):
518 parts.append(f"[Solution {i+1}, score={s:.1f}]:\n{o[:500]}")
519 merge_result = await self._spawn(
520 f"As a meta-synthesizer, merge these {agents} solutions:\n\n"
521 + "\n---\n".join(parts),
522 timeout,
523 )
524 final = merge_result.output
525 else: # weighted
526 total = sum(s for s, _, _ in scored)
527 weights = [(s / total) if total else 0 for s, _, _ in scored]
528 parts = []
529 for (s, o, a), w in zip(scored, weights):
530 parts.append(f"[{a}] weight={w:.2f}:\n{o[:300]}")
531 final = f"Weighted ensemble ({agents} solvers):\n\n" + "\n---\n".join(parts)
533 return CollaborationResult(
534 mode=CollaborationMode.ENSEMBLE,
535 agents=[a for _, _, a in scored],
536 rounds=1,
537 final_output=final,
538 intermediate=scored,
539 consensus=scored[0][0] / 10.0,
540 duration=time.time() - t0,
541 )