Coverage for agentos/agent/pipeline.py: 33%

123 statements  

« prev     ^ index     » next       coverage.py v7.14.3, created at 2026-07-02 09:59 +0800

1""" 

2多Agent编排管道 — Conditional / Parallel / Router。 

3 

4v1.5.1: 支持条件路由(ConditionalPipeline)、并行扇出(ParallelPipeline)、 

5 动态路由(RouterAgent) 三种生产级编排拓扑。 

6""" 

7 

8from __future__ import annotations 

9 

10import asyncio 

11import concurrent.futures 

12from dataclasses import dataclass, field 

13from typing import Any, Callable 

14 

15from agentos.agent.tool_agent import ToolAgent, AgentConfig, AgentResult 

16 

17 

18@dataclass 

19class PipelineAgent: 

20 """管道中的单个 Agent 节点。""" 

21 name: str 

22 agent: ToolAgent 

23 config: AgentConfig | None = None 

24 

25 

26@dataclass 

27class PipelineResult: 

28 """管道执行结果。""" 

29 success: bool = True 

30 steps: list[dict] = field(default_factory=list) 

31 final_output: str = "" 

32 total_tokens: int = 0 

33 total_cost_usd: float = 0.0 

34 total_duration_ms: float = 0.0 

35 error: str = "" 

36 

37 @property 

38 def output(self) -> str: 

39 return self.final_output 

40 

41 

42@dataclass 

43class StepResult: 

44 """单个步骤的结果包装。""" 

45 agent_name: str 

46 result: AgentResult 

47 output_key: str | None = None 

48 

49 

50# ── ConditionalPipeline — 条件路由 ────────────────────────────────── 

51 

52ConditionFn = Callable[[str], str] # 输入 → 下一个 agent 名称 

53 

54 

55class ConditionalPipeline: 

56 """基于条件路由的多 Agent 管道。 

57 

58 每个 Agent 执行完成后,通过条件函数决定下一个调用的 Agent。 

59 支持 if-else / switch-case 风格的决策路由。 

60 

61 Usage:: 

62 

63 cp = ConditionalPipeline() 

64 cp.add("classifier", classifier_agent) 

65 cp.add("legal", legal_agent) 

66 cp.add("tech", tech_agent) 

67 

68 # 根据分类器输出决定下一跳 

69 def route(output: str) -> str: 

70 if "法律" in output: return "legal" 

71 if "技术" in output: return "tech" 

72 return "__END__" 

73 

74 result = cp.run("这份合同有问题吗?", router=route) 

75 """ 

76 

77 def __init__(self, max_hops: int = 5): 

78 self._agents: dict[str, PipelineAgent] = {} 

79 self._max_hops = max_hops 

80 

81 def add(self, name: str, agent: ToolAgent, config: AgentConfig | None = None): 

82 self._agents[name] = PipelineAgent(name=name, agent=agent, config=config) 

83 

84 def run(self, task: str, start_agent: str | None = None, router: ConditionFn | None = None) -> PipelineResult: 

85 if start_agent is None and self._agents: 

86 start_agent = next(iter(self._agents)) 

87 if start_agent not in self._agents: 

88 return PipelineResult(success=False, error=f"Unknown start agent: {start_agent}") 

89 

90 current = start_agent 

91 pipeline_output = "" 

92 steps: list[dict] = [] 

93 total_tokens = 0 

94 total_cost = 0.0 

95 total_ms = 0.0 

96 

97 for hop in range(self._max_hops): 

98 pa = self._agents[current] 

99 result = pa.agent.run(task) 

100 

101 steps.append({ 

102 "hop": hop, 

103 "agent": current, 

104 "output": result.final_answer, 

105 "tokens": result.total_tokens, 

106 "cost": result.total_cost_usd, 

107 "duration_ms": result.total_duration_ms, 

108 }) 

109 total_tokens += result.total_tokens 

110 total_cost += result.total_cost_usd 

111 total_ms += result.total_duration_ms 

112 pipeline_output = result.final_answer 

113 

114 if not result.success: 

115 return PipelineResult( 

116 success=False, steps=steps, final_output=pipeline_output, 

117 total_tokens=total_tokens, total_cost_usd=total_cost, 

118 total_duration_ms=total_ms, error=result.error, 

119 ) 

120 

121 if router is None: 

122 break 

123 

124 next_agent = router(result.final_answer) 

125 if next_agent == "__END__" or next_agent not in self._agents: 

126 break 

127 

128 task = result.final_answer # 下一跳的输入是当前输出 

129 current = next_agent 

130 

131 return PipelineResult( 

132 success=True, steps=steps, final_output=pipeline_output, 

133 total_tokens=total_tokens, total_cost_usd=total_cost, 

134 total_duration_ms=total_ms, 

135 ) 

136 

137 

138# ── ParallelPipeline — 并行扇出 ────────────────────────────────── 

139 

140class ParallelPipeline: 

141 """并行执行多个 Agent,聚合结果。 

142 

143 所有 Agent 同时接收同一个 task,各自独立运行,最后合并输出。 

144 

145 Usage:: 

146 

147 pp = ParallelPipeline() 

148 pp.add("analyst_1", agent_1) 

149 pp.add("analyst_2", agent_2) 

150 pp.add("analyst_3", agent_3) 

151 

152 result = pp.run("分析 Q3 财报",  

153 aggregator=lambda results: "\\n---\\n".join(results.values())) 

154 """ 

155 

156 def __init__(self, max_workers: int = 5): 

157 self._agents: dict[str, PipelineAgent] = {} 

158 self._max_workers = max_workers 

159 

160 def add(self, name: str, agent: ToolAgent, config: AgentConfig | None = None): 

161 self._agents[name] = PipelineAgent(name=name, agent=agent, config=config) 

162 

163 def run(self, task: str, aggregator: Callable[[dict[str, str]], str] | None = None) -> PipelineResult: 

164 if not self._agents: 

165 return PipelineResult(success=False, error="No agents registered") 

166 

167 def _run_one(pa: PipelineAgent) -> tuple[str, AgentResult]: 

168 return pa.name, pa.agent.run(task) 

169 

170 results: dict[str, AgentResult] = {} 

171 total_tokens = 0 

172 total_cost = 0.0 

173 total_ms = 0.0 

174 errors: list[str] = [] 

175 

176 with concurrent.futures.ThreadPoolExecutor(max_workers=self._max_workers) as pool: 

177 futures = {pool.submit(_run_one, pa): pa.name for pa in self._agents.values()} 

178 for fut in concurrent.futures.as_completed(futures): 

179 try: 

180 name, result = fut.result() 

181 results[name] = result 

182 total_tokens += result.total_tokens 

183 total_cost += result.total_cost_usd 

184 total_ms = max(total_ms, result.total_duration_ms) 

185 except Exception as e: 

186 errors.append(f"{futures[fut]}: {e}") 

187 

188 if errors and not results: 

189 return PipelineResult(success=False, error="; ".join(errors)) 

190 

191 raw_outputs = {name: r.final_answer for name, r in results.items()} 

192 if aggregator: 

193 final = aggregator(raw_outputs) 

194 else: 

195 parts = [f"## {name}\n{out}" for name, out in raw_outputs.items()] 

196 final = "\n\n".join(parts) 

197 

198 return PipelineResult( 

199 success=len(errors) == 0, 

200 steps=[ 

201 {"agent": name, "output": r.final_answer, "tokens": r.total_tokens} 

202 for name, r in results.items() 

203 ], 

204 final_output=final, 

205 total_tokens=total_tokens, 

206 total_cost_usd=total_cost, 

207 total_duration_ms=total_ms, 

208 error="; ".join(errors) if errors else "", 

209 ) 

210 

211 

212# ── RouterAgent — 动态路由 ────────────────────────────────────── 

213 

214RouterFn = Callable[[str], tuple[str, str]] # (task, str) → (next_agent_name, rewritten_task) 

215 

216 

217class RouterAgent: 

218 """动态路由编排器 — 根据初始化内容选择合适的 Agent。 

219 

220 使用一个分类器 Agent 先分析任务,再动态路由到目标 Agent。 

221 

222 Usage:: 

223 

224 ra = RouterAgent(classifier_agent) 

225 ra.register("code", code_agent, description="代码生成/调试任务") 

226 ra.register("writing", writer_agent, description="写作/翻译/总结任务") 

227 ra.register("research", research_agent, description="调研/搜索/分析任务") 

228 

229 result = ra.run("帮我写一个 Python 快速排序") 

230 """ 

231 

232 def __init__(self, classifier: ToolAgent): 

233 self._classifier = classifier 

234 self._routes: dict[str, tuple[ToolAgent, str]] = {} 

235 

236 def register(self, name: str, agent: ToolAgent, description: str = ""): 

237 self._routes[name] = (agent, description) 

238 

239 def run(self, task: str) -> PipelineResult: 

240 if not self._routes: 

241 return PipelineResult(success=False, error="No routes registered") 

242 

243 # 构建分类提示 

244 route_desc = "\n".join( 

245 f"- {name}: {desc}" for name, (_, desc) in self._routes.items() 

246 ) 

247 classify_task = ( 

248 f"Analyze the following task and output ONLY the best matching route name " 

249 f"from the list below. Output just the name, nothing else.\n\n" 

250 f"Available routes:\n{route_desc}\n\n" 

251 f"Task: {task}\n\n" 

252 f"Route:" 

253 ) 

254 

255 class_result = self._classifier.run(classify_task) 

256 target = class_result.final_answer.strip().lower() 

257 

258 # 模糊匹配 

259 matched = None 

260 for name in self._routes: 

261 if name.lower() in target: 

262 matched = name 

263 break 

264 

265 if matched is None: 

266 # 回退到第一个 

267 matched = next(iter(self._routes)) 

268 

269 agent, _ = self._routes[matched] 

270 result = agent.run(task) 

271 

272 return PipelineResult( 

273 success=result.success, 

274 steps=[ 

275 {"agent": "router", "output": f"Classified as: {matched}"}, 

276 {"agent": matched, "output": result.final_answer, "tokens": result.total_tokens}, 

277 ], 

278 final_output=result.final_answer, 

279 total_tokens=result.total_tokens, 

280 total_cost_usd=result.total_cost_usd, 

281 total_duration_ms=result.total_duration_ms, 

282 )