Coverage for agentos/workflows/engine.py: 62%
52 statements
« prev ^ index » next coverage.py v7.14.3, created at 2026-07-02 09:59 +0800
« prev ^ index » next coverage.py v7.14.3, created at 2026-07-02 09:59 +0800
1"""
2AgentOS v0.20 预设工作流模板。
3开箱即用的 Agent 协作模式。
4"""
6from __future__ import annotations
8from dataclasses import dataclass, field
9from enum import Enum
10from typing import Any, Callable
13class WorkflowType(str, Enum):
15 """工作流类型枚举。"""
17 CODE_REVIEW = "code_review"
18 RESEARCH = "research"
19 DEBATE = "debate"
20 QA = "qa"
21 CUSTOM = "custom"
24@dataclass
25class WorkflowStep:
26 """工作流步骤定义。"""
27 agent_role: str
28 instruction: str
29 input_from: int | None = None # 上一步的index,None=原始输入
30 parallel: bool = False
33@dataclass
34class Workflow:
35 """预设工作流定义。"""
37 name: str
38 workflow_type: WorkflowType
39 steps: list[WorkflowStep]
40 max_rounds: int = 5
41 auto_merge: bool = True
42 metadata: dict[str, Any] = field(default_factory=dict)
45# ── 内置工作流 ──────────────────────────────────
47CODE_REVIEW = Workflow(
48 name="代码审查",
49 workflow_type=WorkflowType.CODE_REVIEW,
50 steps=[
51 WorkflowStep("architect", "审查代码架构和设计模式"),
52 WorkflowStep("security_expert", "审查安全漏洞和注入风险"),
53 WorkflowStep("performance_expert", "审查性能瓶颈和资源消耗"),
54 WorkflowStep("reviewer", "综合以上意见,输出最终审查报告"),
55 ],
56 max_rounds=1,
57)
60RESEARCH = Workflow(
61 name="深度调研",
62 workflow_type=WorkflowType.RESEARCH,
63 steps=[
64 WorkflowStep("researcher", "搜索并收集相关资料", parallel=False),
65 WorkflowStep("analyst", "分析数据并提取关键insights", input_from=0),
66 WorkflowStep("synthesizer", "综合所有发现,撰写调研报告", input_from=1),
67 ],
68 max_rounds=1,
69)
72DEBATE = Workflow(
73 name="辩证讨论",
74 workflow_type=WorkflowType.DEBATE,
75 steps=[
76 WorkflowStep("proponent", "提出论点并给出论据"),
77 WorkflowStep("opponent", "反驳对方论点,指出逻辑漏洞", input_from=0),
78 WorkflowStep("judge", "综合双方观点,给出平衡结论", input_from=1),
79 ],
80 max_rounds=3,
81)
84QA = Workflow(
85 name="智能问答",
86 workflow_type=WorkflowType.QA,
87 steps=[
88 WorkflowStep("retriever", "从知识库检索相关信息"),
89 WorkflowStep("reasoner", "基于检索结果进行推理回答", input_from=0),
90 WorkflowStep("verifier", "验证答案准确性并修正", input_from=1),
91 ],
92 max_rounds=2,
93)
96BUILTIN_WORKFLOWS: dict[WorkflowType, Workflow] = {
97 WorkflowType.CODE_REVIEW: CODE_REVIEW,
98 WorkflowType.RESEARCH: RESEARCH,
99 WorkflowType.DEBATE: DEBATE,
100 WorkflowType.QA: QA,
101}
104class WorkflowEngine:
105 """工作流引擎 — 按预设步骤调度多个Agent协作。"""
107 def __init__(self, workflow: Workflow, agent_factory: Callable[[str], Any]):
108 self.workflow = workflow
109 self.agent_factory = agent_factory
110 self._results: dict[int, Any] = {}
112 async def execute(self, input_text: str, context: dict | None = None) -> str:
113 """执行工作流。"""
114 from agentos.models.router import ModelRouter
116 last_output = input_text
117 for round_idx in range(self.workflow.max_rounds):
118 for step_idx, step in enumerate(self.workflow.steps):
119 if step.input_from is not None:
120 feed = self._results.get(step.input_from, input_text)
121 else:
122 feed = input_text if round_idx == 0 else last_output
124 agent = self.agent_factory(step.agent_role)
125 full_prompt = f"""你是一名{step.agent_role}。
127输入内容:
128{feed}
130任务:
131{step.instruction}
133请直接给出你的分析和结论。"""
135 result = await agent.run(full_prompt, context=context or {})
136 self._results[step_idx] = result.get("output", str(result))
137 last_output = self._results[step_idx]
139 if self.workflow.auto_merge:
140 break # 单轮工作流
142 if self.workflow.auto_merge:
143 return self._results.get(len(self.workflow.steps) - 1, last_output)
145 return last_output