Coverage for agentos/workflows/engine.py: 62%

52 statements  

« prev     ^ index     » next       coverage.py v7.14.3, created at 2026-07-02 09:59 +0800

1""" 

2AgentOS v0.20 预设工作流模板。 

3开箱即用的 Agent 协作模式。 

4""" 

5 

6from __future__ import annotations 

7 

8from dataclasses import dataclass, field 

9from enum import Enum 

10from typing import Any, Callable 

11 

12 

13class WorkflowType(str, Enum): 

14 

15 """工作流类型枚举。""" 

16 

17 CODE_REVIEW = "code_review" 

18 RESEARCH = "research" 

19 DEBATE = "debate" 

20 QA = "qa" 

21 CUSTOM = "custom" 

22 

23 

24@dataclass 

25class WorkflowStep: 

26 """工作流步骤定义。""" 

27 agent_role: str 

28 instruction: str 

29 input_from: int | None = None # 上一步的index,None=原始输入 

30 parallel: bool = False 

31 

32 

33@dataclass 

34class Workflow: 

35 """预设工作流定义。""" 

36 

37 name: str 

38 workflow_type: WorkflowType 

39 steps: list[WorkflowStep] 

40 max_rounds: int = 5 

41 auto_merge: bool = True 

42 metadata: dict[str, Any] = field(default_factory=dict) 

43 

44 

45# ── 内置工作流 ────────────────────────────────── 

46 

47CODE_REVIEW = Workflow( 

48 name="代码审查", 

49 workflow_type=WorkflowType.CODE_REVIEW, 

50 steps=[ 

51 WorkflowStep("architect", "审查代码架构和设计模式"), 

52 WorkflowStep("security_expert", "审查安全漏洞和注入风险"), 

53 WorkflowStep("performance_expert", "审查性能瓶颈和资源消耗"), 

54 WorkflowStep("reviewer", "综合以上意见,输出最终审查报告"), 

55 ], 

56 max_rounds=1, 

57) 

58 

59 

60RESEARCH = Workflow( 

61 name="深度调研", 

62 workflow_type=WorkflowType.RESEARCH, 

63 steps=[ 

64 WorkflowStep("researcher", "搜索并收集相关资料", parallel=False), 

65 WorkflowStep("analyst", "分析数据并提取关键insights", input_from=0), 

66 WorkflowStep("synthesizer", "综合所有发现,撰写调研报告", input_from=1), 

67 ], 

68 max_rounds=1, 

69) 

70 

71 

72DEBATE = Workflow( 

73 name="辩证讨论", 

74 workflow_type=WorkflowType.DEBATE, 

75 steps=[ 

76 WorkflowStep("proponent", "提出论点并给出论据"), 

77 WorkflowStep("opponent", "反驳对方论点,指出逻辑漏洞", input_from=0), 

78 WorkflowStep("judge", "综合双方观点,给出平衡结论", input_from=1), 

79 ], 

80 max_rounds=3, 

81) 

82 

83 

84QA = Workflow( 

85 name="智能问答", 

86 workflow_type=WorkflowType.QA, 

87 steps=[ 

88 WorkflowStep("retriever", "从知识库检索相关信息"), 

89 WorkflowStep("reasoner", "基于检索结果进行推理回答", input_from=0), 

90 WorkflowStep("verifier", "验证答案准确性并修正", input_from=1), 

91 ], 

92 max_rounds=2, 

93) 

94 

95 

96BUILTIN_WORKFLOWS: dict[WorkflowType, Workflow] = { 

97 WorkflowType.CODE_REVIEW: CODE_REVIEW, 

98 WorkflowType.RESEARCH: RESEARCH, 

99 WorkflowType.DEBATE: DEBATE, 

100 WorkflowType.QA: QA, 

101} 

102 

103 

104class WorkflowEngine: 

105 """工作流引擎 — 按预设步骤调度多个Agent协作。""" 

106 

107 def __init__(self, workflow: Workflow, agent_factory: Callable[[str], Any]): 

108 self.workflow = workflow 

109 self.agent_factory = agent_factory 

110 self._results: dict[int, Any] = {} 

111 

112 async def execute(self, input_text: str, context: dict | None = None) -> str: 

113 """执行工作流。""" 

114 from agentos.models.router import ModelRouter 

115 

116 last_output = input_text 

117 for round_idx in range(self.workflow.max_rounds): 

118 for step_idx, step in enumerate(self.workflow.steps): 

119 if step.input_from is not None: 

120 feed = self._results.get(step.input_from, input_text) 

121 else: 

122 feed = input_text if round_idx == 0 else last_output 

123 

124 agent = self.agent_factory(step.agent_role) 

125 full_prompt = f"""你是一名{step.agent_role}。 

126 

127输入内容: 

128{feed} 

129 

130任务: 

131{step.instruction} 

132 

133请直接给出你的分析和结论。""" 

134 

135 result = await agent.run(full_prompt, context=context or {}) 

136 self._results[step_idx] = result.get("output", str(result)) 

137 last_output = self._results[step_idx] 

138 

139 if self.workflow.auto_merge: 

140 break # 单轮工作流 

141 

142 if self.workflow.auto_merge: 

143 return self._results.get(len(self.workflow.steps) - 1, last_output) 

144 

145 return last_output