Coverage for agentos/agent/pipeline.py: 33%
123 statements
« prev ^ index » next coverage.py v7.14.3, created at 2026-07-02 09:59 +0800
« prev ^ index » next coverage.py v7.14.3, created at 2026-07-02 09:59 +0800
1"""
2多Agent编排管道 — Conditional / Parallel / Router。
4v1.5.1: 支持条件路由(ConditionalPipeline)、并行扇出(ParallelPipeline)、
5 动态路由(RouterAgent) 三种生产级编排拓扑。
6"""
8from __future__ import annotations
10import asyncio
11import concurrent.futures
12from dataclasses import dataclass, field
13from typing import Any, Callable
15from agentos.agent.tool_agent import ToolAgent, AgentConfig, AgentResult
18@dataclass
19class PipelineAgent:
20 """管道中的单个 Agent 节点。"""
21 name: str
22 agent: ToolAgent
23 config: AgentConfig | None = None
26@dataclass
27class PipelineResult:
28 """管道执行结果。"""
29 success: bool = True
30 steps: list[dict] = field(default_factory=list)
31 final_output: str = ""
32 total_tokens: int = 0
33 total_cost_usd: float = 0.0
34 total_duration_ms: float = 0.0
35 error: str = ""
37 @property
38 def output(self) -> str:
39 return self.final_output
42@dataclass
43class StepResult:
44 """单个步骤的结果包装。"""
45 agent_name: str
46 result: AgentResult
47 output_key: str | None = None
50# ── ConditionalPipeline — 条件路由 ──────────────────────────────────
52ConditionFn = Callable[[str], str] # 输入 → 下一个 agent 名称
55class ConditionalPipeline:
56 """基于条件路由的多 Agent 管道。
58 每个 Agent 执行完成后,通过条件函数决定下一个调用的 Agent。
59 支持 if-else / switch-case 风格的决策路由。
61 Usage::
63 cp = ConditionalPipeline()
64 cp.add("classifier", classifier_agent)
65 cp.add("legal", legal_agent)
66 cp.add("tech", tech_agent)
68 # 根据分类器输出决定下一跳
69 def route(output: str) -> str:
70 if "法律" in output: return "legal"
71 if "技术" in output: return "tech"
72 return "__END__"
74 result = cp.run("这份合同有问题吗?", router=route)
75 """
77 def __init__(self, max_hops: int = 5):
78 self._agents: dict[str, PipelineAgent] = {}
79 self._max_hops = max_hops
81 def add(self, name: str, agent: ToolAgent, config: AgentConfig | None = None):
82 self._agents[name] = PipelineAgent(name=name, agent=agent, config=config)
84 def run(self, task: str, start_agent: str | None = None, router: ConditionFn | None = None) -> PipelineResult:
85 if start_agent is None and self._agents:
86 start_agent = next(iter(self._agents))
87 if start_agent not in self._agents:
88 return PipelineResult(success=False, error=f"Unknown start agent: {start_agent}")
90 current = start_agent
91 pipeline_output = ""
92 steps: list[dict] = []
93 total_tokens = 0
94 total_cost = 0.0
95 total_ms = 0.0
97 for hop in range(self._max_hops):
98 pa = self._agents[current]
99 result = pa.agent.run(task)
101 steps.append({
102 "hop": hop,
103 "agent": current,
104 "output": result.final_answer,
105 "tokens": result.total_tokens,
106 "cost": result.total_cost_usd,
107 "duration_ms": result.total_duration_ms,
108 })
109 total_tokens += result.total_tokens
110 total_cost += result.total_cost_usd
111 total_ms += result.total_duration_ms
112 pipeline_output = result.final_answer
114 if not result.success:
115 return PipelineResult(
116 success=False, steps=steps, final_output=pipeline_output,
117 total_tokens=total_tokens, total_cost_usd=total_cost,
118 total_duration_ms=total_ms, error=result.error,
119 )
121 if router is None:
122 break
124 next_agent = router(result.final_answer)
125 if next_agent == "__END__" or next_agent not in self._agents:
126 break
128 task = result.final_answer # 下一跳的输入是当前输出
129 current = next_agent
131 return PipelineResult(
132 success=True, steps=steps, final_output=pipeline_output,
133 total_tokens=total_tokens, total_cost_usd=total_cost,
134 total_duration_ms=total_ms,
135 )
138# ── ParallelPipeline — 并行扇出 ──────────────────────────────────
140class ParallelPipeline:
141 """并行执行多个 Agent,聚合结果。
143 所有 Agent 同时接收同一个 task,各自独立运行,最后合并输出。
145 Usage::
147 pp = ParallelPipeline()
148 pp.add("analyst_1", agent_1)
149 pp.add("analyst_2", agent_2)
150 pp.add("analyst_3", agent_3)
152 result = pp.run("分析 Q3 财报",
153 aggregator=lambda results: "\\n---\\n".join(results.values()))
154 """
156 def __init__(self, max_workers: int = 5):
157 self._agents: dict[str, PipelineAgent] = {}
158 self._max_workers = max_workers
160 def add(self, name: str, agent: ToolAgent, config: AgentConfig | None = None):
161 self._agents[name] = PipelineAgent(name=name, agent=agent, config=config)
163 def run(self, task: str, aggregator: Callable[[dict[str, str]], str] | None = None) -> PipelineResult:
164 if not self._agents:
165 return PipelineResult(success=False, error="No agents registered")
167 def _run_one(pa: PipelineAgent) -> tuple[str, AgentResult]:
168 return pa.name, pa.agent.run(task)
170 results: dict[str, AgentResult] = {}
171 total_tokens = 0
172 total_cost = 0.0
173 total_ms = 0.0
174 errors: list[str] = []
176 with concurrent.futures.ThreadPoolExecutor(max_workers=self._max_workers) as pool:
177 futures = {pool.submit(_run_one, pa): pa.name for pa in self._agents.values()}
178 for fut in concurrent.futures.as_completed(futures):
179 try:
180 name, result = fut.result()
181 results[name] = result
182 total_tokens += result.total_tokens
183 total_cost += result.total_cost_usd
184 total_ms = max(total_ms, result.total_duration_ms)
185 except Exception as e:
186 errors.append(f"{futures[fut]}: {e}")
188 if errors and not results:
189 return PipelineResult(success=False, error="; ".join(errors))
191 raw_outputs = {name: r.final_answer for name, r in results.items()}
192 if aggregator:
193 final = aggregator(raw_outputs)
194 else:
195 parts = [f"## {name}\n{out}" for name, out in raw_outputs.items()]
196 final = "\n\n".join(parts)
198 return PipelineResult(
199 success=len(errors) == 0,
200 steps=[
201 {"agent": name, "output": r.final_answer, "tokens": r.total_tokens}
202 for name, r in results.items()
203 ],
204 final_output=final,
205 total_tokens=total_tokens,
206 total_cost_usd=total_cost,
207 total_duration_ms=total_ms,
208 error="; ".join(errors) if errors else "",
209 )
212# ── RouterAgent — 动态路由 ──────────────────────────────────────
214RouterFn = Callable[[str], tuple[str, str]] # (task, str) → (next_agent_name, rewritten_task)
217class RouterAgent:
218 """动态路由编排器 — 根据初始化内容选择合适的 Agent。
220 使用一个分类器 Agent 先分析任务,再动态路由到目标 Agent。
222 Usage::
224 ra = RouterAgent(classifier_agent)
225 ra.register("code", code_agent, description="代码生成/调试任务")
226 ra.register("writing", writer_agent, description="写作/翻译/总结任务")
227 ra.register("research", research_agent, description="调研/搜索/分析任务")
229 result = ra.run("帮我写一个 Python 快速排序")
230 """
232 def __init__(self, classifier: ToolAgent):
233 self._classifier = classifier
234 self._routes: dict[str, tuple[ToolAgent, str]] = {}
236 def register(self, name: str, agent: ToolAgent, description: str = ""):
237 self._routes[name] = (agent, description)
239 def run(self, task: str) -> PipelineResult:
240 if not self._routes:
241 return PipelineResult(success=False, error="No routes registered")
243 # 构建分类提示
244 route_desc = "\n".join(
245 f"- {name}: {desc}" for name, (_, desc) in self._routes.items()
246 )
247 classify_task = (
248 f"Analyze the following task and output ONLY the best matching route name "
249 f"from the list below. Output just the name, nothing else.\n\n"
250 f"Available routes:\n{route_desc}\n\n"
251 f"Task: {task}\n\n"
252 f"Route:"
253 )
255 class_result = self._classifier.run(classify_task)
256 target = class_result.final_answer.strip().lower()
258 # 模糊匹配
259 matched = None
260 for name in self._routes:
261 if name.lower() in target:
262 matched = name
263 break
265 if matched is None:
266 # 回退到第一个
267 matched = next(iter(self._routes))
269 agent, _ = self._routes[matched]
270 result = agent.run(task)
272 return PipelineResult(
273 success=result.success,
274 steps=[
275 {"agent": "router", "output": f"Classified as: {matched}"},
276 {"agent": matched, "output": result.final_answer, "tokens": result.total_tokens},
277 ],
278 final_output=result.final_answer,
279 total_tokens=result.total_tokens,
280 total_cost_usd=result.total_cost_usd,
281 total_duration_ms=result.total_duration_ms,
282 )