Coverage for agentos/workflows/templates.py: 58%

125 statements  

« prev     ^ index     » next       coverage.py v7.14.3, created at 2026-07-02 09:59 +0800

1""" 

2Workflow Templates — Declarative, reusable multi-step agent workflows. 

3 

4Define workflows as YAML/JSON templates with conditional branching, 

5parallel execution, retry policies, and human-in-the-loop checkpoints. 

6""" 

7 

8from __future__ import annotations 

9 

10import json 

11import yaml 

12from dataclasses import dataclass, field 

13from enum import Enum 

14from typing import Any, Optional 

15 

16 

17class StepType(Enum): 

18 

19 """步骤类型枚举。""" 

20 

21 AGENT = "agent" 

22 TOOL = "tool" 

23 CONDITION = "condition" 

24 PARALLEL = "parallel" 

25 HUMAN_REVIEW = "human_review" 

26 TRANSFORM = "transform" 

27 WAIT = "wait" 

28 

29 

30class RetryPolicy(Enum): 

31 

32 """重试策略类。""" 

33 

34 NONE = "none" 

35 LINEAR = "linear" 

36 EXPONENTIAL = "exponential" 

37 

38 

39@dataclass 

40class WorkflowStep: 

41 """Single step in a workflow template.""" 

42 

43 name: str 

44 step_type: StepType = StepType.AGENT 

45 description: str = "" 

46 

47 # Agent/Tool step 

48 agent_type: str = "default" 

49 task_template: str = "" 

50 tool_name: str = "" 

51 

52 # Condition step 

53 condition: str = "" 

54 """Python expression evaluated with step outputs as variables.""" 

55 

56 then_steps: list["WorkflowStep"] = field(default_factory=list) 

57 else_steps: list["WorkflowStep"] = field(default_factory=list) 

58 

59 # Parallel step 

60 sub_steps: list["WorkflowStep"] = field(default_factory=list) 

61 max_concurrency: int = 5 

62 

63 # Human review 

64 review_prompt: str = "" 

65 timeout_minutes: int = 30 

66 

67 # Retry 

68 retry_policy: RetryPolicy = RetryPolicy.NONE 

69 max_retries: int = 3 

70 retry_delay_seconds: float = 1.0 

71 

72 # Transform 

73 transform_expr: str = "" 

74 """Python expression to transform output.""" 

75 

76 # Input/output 

77 depends_on: list[str] = field(default_factory=list) 

78 output_key: str = "" 

79 """Store output under this key for downstream steps.""" 

80 

81 

82@dataclass 

83class WorkflowTemplate: 

84 """ 

85 Declarative workflow template. 

86 

87 Example (YAML):: 

88 

89 name: research_report 

90 description: Research a topic and generate a report 

91 steps: 

92 - name: research 

93 step_type: agent 

94 agent_type: researcher 

95 task_template: "Research: {{input.topic}}" 

96 output_key: research_result 

97 - name: review 

98 step_type: human_review 

99 review_prompt: "Review the research: {{research_result}}" 

100 depends_on: [research] 

101 - name: write_report 

102 step_type: agent 

103 agent_type: writer 

104 task_template: "Write report based on: {{research_result}}" 

105 depends_on: [review] 

106 output_key: final_report 

107 """ 

108 

109 name: str 

110 description: str = "" 

111 version: str = "1.0" 

112 steps: list[WorkflowStep] = field(default_factory=list) 

113 metadata: dict[str, Any] = field(default_factory=dict) 

114 

115 def to_dict(self) -> dict[str, Any]: 

116 """Serialize to dict.""" 

117 return { 

118 "name": self.name, 

119 "description": self.description, 

120 "version": self.version, 

121 "steps": [self._step_to_dict(s) for s in self.steps], 

122 "metadata": self.metadata, 

123 } 

124 

125 def _step_to_dict(self, step: WorkflowStep) -> dict[str, Any]: 

126 d: dict[str, Any] = { 

127 "name": step.name, 

128 "step_type": step.step_type.value, 

129 "description": step.description, 

130 } 

131 if step.agent_type != "default": 

132 d["agent_type"] = step.agent_type 

133 if step.task_template: 

134 d["task_template"] = step.task_template 

135 if step.tool_name: 

136 d["tool_name"] = step.tool_name 

137 if step.output_key: 

138 d["output_key"] = step.output_key 

139 if step.condition: 

140 d["condition"] = step.condition 

141 if step.depends_on: 

142 d["depends_on"] = step.depends_on 

143 if step.then_steps: 

144 d["then_steps"] = [self._step_to_dict(s) for s in step.then_steps] 

145 if step.else_steps: 

146 d["else_steps"] = [self._step_to_dict(s) for s in step.else_steps] 

147 if step.sub_steps: 

148 d["sub_steps"] = [self._step_to_dict(s) for s in step.sub_steps] 

149 d["max_concurrency"] = step.max_concurrency 

150 if step.retry_policy != RetryPolicy.NONE: 

151 d["retry_policy"] = step.retry_policy.value 

152 d["max_retries"] = step.max_retries 

153 return d 

154 

155 @classmethod 

156 def from_dict(cls, data: dict[str, Any]) -> "WorkflowTemplate": 

157 """Deserialize from dict.""" 

158 return cls( 

159 name=data["name"], 

160 description=data.get("description", ""), 

161 version=data.get("version", "1.0"), 

162 steps=[cls._step_from_dict(s) for s in data.get("steps", [])], 

163 metadata=data.get("metadata", {}), 

164 ) 

165 

166 @classmethod 

167 def _step_from_dict(cls, data: dict[str, Any]) -> WorkflowStep: 

168 return WorkflowStep( 

169 name=data["name"], 

170 step_type=StepType(data.get("step_type", "agent")), 

171 description=data.get("description", ""), 

172 agent_type=data.get("agent_type", "default"), 

173 task_template=data.get("task_template", ""), 

174 tool_name=data.get("tool_name", ""), 

175 output_key=data.get("output_key", ""), 

176 condition=data.get("condition", ""), 

177 depends_on=data.get("depends_on", []), 

178 then_steps=[cls._step_from_dict(s) for s in data.get("then_steps", [])], 

179 else_steps=[cls._step_from_dict(s) for s in data.get("else_steps", [])], 

180 sub_steps=[cls._step_from_dict(s) for s in data.get("sub_steps", [])], 

181 max_concurrency=data.get("max_concurrency", 5), 

182 retry_policy=RetryPolicy(data.get("retry_policy", "none")), 

183 max_retries=data.get("max_retries", 3), 

184 retry_delay_seconds=data.get("retry_delay_seconds", 1.0), 

185 ) 

186 

187 def to_yaml(self) -> str: 

188 """Export workflow as YAML string.""" 

189 return yaml.dump(self.to_dict(), default_flow_style=False, sort_keys=False) 

190 

191 def to_json(self, indent: int = 2) -> str: 

192 """Export workflow as JSON string.""" 

193 return json.dumps(self.to_dict(), indent=indent, ensure_ascii=False) 

194 

195 @classmethod 

196 def from_yaml(cls, yaml_str: str) -> "WorkflowTemplate": 

197 """Load workflow from YAML string.""" 

198 data = yaml.safe_load(yaml_str) 

199 return cls.from_dict(data) 

200 

201 @classmethod 

202 def from_json(cls, json_str: str) -> "WorkflowTemplate": 

203 """Load workflow from JSON string.""" 

204 data = json.loads(json_str) 

205 return cls.from_dict(data) 

206 

207 def get_step(self, name: str) -> Optional[WorkflowStep]: 

208 """Find a step by name (searches recursively).""" 

209 for step in self.steps: 

210 result = self._find_step(step, name) 

211 if result: 

212 return result 

213 return None 

214 

215 def _find_step(self, step: WorkflowStep, name: str) -> Optional[WorkflowStep]: 

216 if step.name == name: 

217 return step 

218 for sub in step.then_steps + step.else_steps + step.sub_steps: 

219 result = self._find_step(sub, name) 

220 if result: 

221 return result 

222 return None 

223 

224 def flatten_steps(self) -> list[WorkflowStep]: 

225 """Return all steps in a flat list.""" 

226 result: list[WorkflowStep] = [] 

227 for step in self.steps: 

228 self._flatten(step, result) 

229 return result 

230 

231 def _flatten(self, step: WorkflowStep, result: list[WorkflowStep]) -> None: 

232 result.append(step) 

233 for sub in step.then_steps + step.else_steps + step.sub_steps: 

234 self._flatten(sub, result) 

235 

236 @property 

237 def step_count(self) -> int: 

238 return len(self.flatten_steps()) 

239 

240 

241# ---- Built-in Workflow Templates ---- 

242 

243BUILTIN_TEMPLATES: dict[str, WorkflowTemplate] = {} 

244 

245 

246def _init_builtins() -> None: 

247 """Initialize built-in workflow templates.""" 

248 # Research → Summarize → Report 

249 BUILTIN_TEMPLATES["research_report"] = WorkflowTemplate( 

250 name="research_report", 

251 description="Research a topic, summarize findings, generate report", 

252 steps=[ 

253 WorkflowStep( 

254 name="research", 

255 step_type=StepType.AGENT, 

256 agent_type="researcher", 

257 task_template="Deep research on: {{input.topic}}", 

258 output_key="research", 

259 ), 

260 WorkflowStep( 

261 name="summarize", 

262 step_type=StepType.AGENT, 

263 agent_type="summarizer", 

264 task_template="Summarize key findings from: {{research}}", 

265 depends_on=["research"], 

266 output_key="summary", 

267 ), 

268 WorkflowStep( 

269 name="report", 

270 step_type=StepType.AGENT, 

271 agent_type="writer", 

272 task_template="Write a comprehensive report based on: {{research}}\\nSummary: {{summary}}", 

273 depends_on=["research", "summarize"], 

274 output_key="report", 

275 ), 

276 ], 

277 ) 

278 

279 # Code Review → Fix → Test 

280 BUILTIN_TEMPLATES["code_review"] = WorkflowTemplate( 

281 name="code_review", 

282 description="Review code, apply fixes, run tests", 

283 steps=[ 

284 WorkflowStep( 

285 name="review", 

286 step_type=StepType.AGENT, 

287 agent_type="code_reviewer", 

288 task_template="Review this code for bugs and improvements:\\n```\\n{{input.code}}\\n```", 

289 output_key="review_feedback", 

290 ), 

291 WorkflowStep( 

292 name="human_approval", 

293 step_type=StepType.HUMAN_REVIEW, 

294 review_prompt="Approve fixes based on: {{review_feedback}}", 

295 depends_on=["review"], 

296 ), 

297 WorkflowStep( 

298 name="apply_fixes", 

299 step_type=StepType.AGENT, 

300 agent_type="coder", 

301 task_template="Apply fixes based on review:\\n{{review_feedback}}\\n\\nOriginal code:\\n```\\n{{input.code}}\\n```", 

302 depends_on=["human_approval"], 

303 output_key="fixed_code", 

304 retry_policy=RetryPolicy.EXPONENTIAL, 

305 max_retries=3, 

306 ), 

307 WorkflowStep( 

308 name="test", 

309 step_type=StepType.TOOL, 

310 tool_name="run_tests", 

311 depends_on=["apply_fixes"], 

312 ), 

313 ], 

314 ) 

315 

316 

317_init_builtins()