Coverage for agentos/workflows/templates.py: 58%
125 statements
« prev ^ index » next coverage.py v7.14.3, created at 2026-07-02 09:59 +0800
« prev ^ index » next coverage.py v7.14.3, created at 2026-07-02 09:59 +0800
1"""
2Workflow Templates — Declarative, reusable multi-step agent workflows.
4Define workflows as YAML/JSON templates with conditional branching,
5parallel execution, retry policies, and human-in-the-loop checkpoints.
6"""
8from __future__ import annotations
10import json
11import yaml
12from dataclasses import dataclass, field
13from enum import Enum
14from typing import Any, Optional
17class StepType(Enum):
19 """步骤类型枚举。"""
21 AGENT = "agent"
22 TOOL = "tool"
23 CONDITION = "condition"
24 PARALLEL = "parallel"
25 HUMAN_REVIEW = "human_review"
26 TRANSFORM = "transform"
27 WAIT = "wait"
30class RetryPolicy(Enum):
32 """重试策略类。"""
34 NONE = "none"
35 LINEAR = "linear"
36 EXPONENTIAL = "exponential"
39@dataclass
40class WorkflowStep:
41 """Single step in a workflow template."""
43 name: str
44 step_type: StepType = StepType.AGENT
45 description: str = ""
47 # Agent/Tool step
48 agent_type: str = "default"
49 task_template: str = ""
50 tool_name: str = ""
52 # Condition step
53 condition: str = ""
54 """Python expression evaluated with step outputs as variables."""
56 then_steps: list["WorkflowStep"] = field(default_factory=list)
57 else_steps: list["WorkflowStep"] = field(default_factory=list)
59 # Parallel step
60 sub_steps: list["WorkflowStep"] = field(default_factory=list)
61 max_concurrency: int = 5
63 # Human review
64 review_prompt: str = ""
65 timeout_minutes: int = 30
67 # Retry
68 retry_policy: RetryPolicy = RetryPolicy.NONE
69 max_retries: int = 3
70 retry_delay_seconds: float = 1.0
72 # Transform
73 transform_expr: str = ""
74 """Python expression to transform output."""
76 # Input/output
77 depends_on: list[str] = field(default_factory=list)
78 output_key: str = ""
79 """Store output under this key for downstream steps."""
82@dataclass
83class WorkflowTemplate:
84 """
85 Declarative workflow template.
87 Example (YAML)::
89 name: research_report
90 description: Research a topic and generate a report
91 steps:
92 - name: research
93 step_type: agent
94 agent_type: researcher
95 task_template: "Research: {{input.topic}}"
96 output_key: research_result
97 - name: review
98 step_type: human_review
99 review_prompt: "Review the research: {{research_result}}"
100 depends_on: [research]
101 - name: write_report
102 step_type: agent
103 agent_type: writer
104 task_template: "Write report based on: {{research_result}}"
105 depends_on: [review]
106 output_key: final_report
107 """
109 name: str
110 description: str = ""
111 version: str = "1.0"
112 steps: list[WorkflowStep] = field(default_factory=list)
113 metadata: dict[str, Any] = field(default_factory=dict)
115 def to_dict(self) -> dict[str, Any]:
116 """Serialize to dict."""
117 return {
118 "name": self.name,
119 "description": self.description,
120 "version": self.version,
121 "steps": [self._step_to_dict(s) for s in self.steps],
122 "metadata": self.metadata,
123 }
125 def _step_to_dict(self, step: WorkflowStep) -> dict[str, Any]:
126 d: dict[str, Any] = {
127 "name": step.name,
128 "step_type": step.step_type.value,
129 "description": step.description,
130 }
131 if step.agent_type != "default":
132 d["agent_type"] = step.agent_type
133 if step.task_template:
134 d["task_template"] = step.task_template
135 if step.tool_name:
136 d["tool_name"] = step.tool_name
137 if step.output_key:
138 d["output_key"] = step.output_key
139 if step.condition:
140 d["condition"] = step.condition
141 if step.depends_on:
142 d["depends_on"] = step.depends_on
143 if step.then_steps:
144 d["then_steps"] = [self._step_to_dict(s) for s in step.then_steps]
145 if step.else_steps:
146 d["else_steps"] = [self._step_to_dict(s) for s in step.else_steps]
147 if step.sub_steps:
148 d["sub_steps"] = [self._step_to_dict(s) for s in step.sub_steps]
149 d["max_concurrency"] = step.max_concurrency
150 if step.retry_policy != RetryPolicy.NONE:
151 d["retry_policy"] = step.retry_policy.value
152 d["max_retries"] = step.max_retries
153 return d
155 @classmethod
156 def from_dict(cls, data: dict[str, Any]) -> "WorkflowTemplate":
157 """Deserialize from dict."""
158 return cls(
159 name=data["name"],
160 description=data.get("description", ""),
161 version=data.get("version", "1.0"),
162 steps=[cls._step_from_dict(s) for s in data.get("steps", [])],
163 metadata=data.get("metadata", {}),
164 )
166 @classmethod
167 def _step_from_dict(cls, data: dict[str, Any]) -> WorkflowStep:
168 return WorkflowStep(
169 name=data["name"],
170 step_type=StepType(data.get("step_type", "agent")),
171 description=data.get("description", ""),
172 agent_type=data.get("agent_type", "default"),
173 task_template=data.get("task_template", ""),
174 tool_name=data.get("tool_name", ""),
175 output_key=data.get("output_key", ""),
176 condition=data.get("condition", ""),
177 depends_on=data.get("depends_on", []),
178 then_steps=[cls._step_from_dict(s) for s in data.get("then_steps", [])],
179 else_steps=[cls._step_from_dict(s) for s in data.get("else_steps", [])],
180 sub_steps=[cls._step_from_dict(s) for s in data.get("sub_steps", [])],
181 max_concurrency=data.get("max_concurrency", 5),
182 retry_policy=RetryPolicy(data.get("retry_policy", "none")),
183 max_retries=data.get("max_retries", 3),
184 retry_delay_seconds=data.get("retry_delay_seconds", 1.0),
185 )
187 def to_yaml(self) -> str:
188 """Export workflow as YAML string."""
189 return yaml.dump(self.to_dict(), default_flow_style=False, sort_keys=False)
191 def to_json(self, indent: int = 2) -> str:
192 """Export workflow as JSON string."""
193 return json.dumps(self.to_dict(), indent=indent, ensure_ascii=False)
195 @classmethod
196 def from_yaml(cls, yaml_str: str) -> "WorkflowTemplate":
197 """Load workflow from YAML string."""
198 data = yaml.safe_load(yaml_str)
199 return cls.from_dict(data)
201 @classmethod
202 def from_json(cls, json_str: str) -> "WorkflowTemplate":
203 """Load workflow from JSON string."""
204 data = json.loads(json_str)
205 return cls.from_dict(data)
207 def get_step(self, name: str) -> Optional[WorkflowStep]:
208 """Find a step by name (searches recursively)."""
209 for step in self.steps:
210 result = self._find_step(step, name)
211 if result:
212 return result
213 return None
215 def _find_step(self, step: WorkflowStep, name: str) -> Optional[WorkflowStep]:
216 if step.name == name:
217 return step
218 for sub in step.then_steps + step.else_steps + step.sub_steps:
219 result = self._find_step(sub, name)
220 if result:
221 return result
222 return None
224 def flatten_steps(self) -> list[WorkflowStep]:
225 """Return all steps in a flat list."""
226 result: list[WorkflowStep] = []
227 for step in self.steps:
228 self._flatten(step, result)
229 return result
231 def _flatten(self, step: WorkflowStep, result: list[WorkflowStep]) -> None:
232 result.append(step)
233 for sub in step.then_steps + step.else_steps + step.sub_steps:
234 self._flatten(sub, result)
236 @property
237 def step_count(self) -> int:
238 return len(self.flatten_steps())
241# ---- Built-in Workflow Templates ----
243BUILTIN_TEMPLATES: dict[str, WorkflowTemplate] = {}
246def _init_builtins() -> None:
247 """Initialize built-in workflow templates."""
248 # Research → Summarize → Report
249 BUILTIN_TEMPLATES["research_report"] = WorkflowTemplate(
250 name="research_report",
251 description="Research a topic, summarize findings, generate report",
252 steps=[
253 WorkflowStep(
254 name="research",
255 step_type=StepType.AGENT,
256 agent_type="researcher",
257 task_template="Deep research on: {{input.topic}}",
258 output_key="research",
259 ),
260 WorkflowStep(
261 name="summarize",
262 step_type=StepType.AGENT,
263 agent_type="summarizer",
264 task_template="Summarize key findings from: {{research}}",
265 depends_on=["research"],
266 output_key="summary",
267 ),
268 WorkflowStep(
269 name="report",
270 step_type=StepType.AGENT,
271 agent_type="writer",
272 task_template="Write a comprehensive report based on: {{research}}\\nSummary: {{summary}}",
273 depends_on=["research", "summarize"],
274 output_key="report",
275 ),
276 ],
277 )
279 # Code Review → Fix → Test
280 BUILTIN_TEMPLATES["code_review"] = WorkflowTemplate(
281 name="code_review",
282 description="Review code, apply fixes, run tests",
283 steps=[
284 WorkflowStep(
285 name="review",
286 step_type=StepType.AGENT,
287 agent_type="code_reviewer",
288 task_template="Review this code for bugs and improvements:\\n```\\n{{input.code}}\\n```",
289 output_key="review_feedback",
290 ),
291 WorkflowStep(
292 name="human_approval",
293 step_type=StepType.HUMAN_REVIEW,
294 review_prompt="Approve fixes based on: {{review_feedback}}",
295 depends_on=["review"],
296 ),
297 WorkflowStep(
298 name="apply_fixes",
299 step_type=StepType.AGENT,
300 agent_type="coder",
301 task_template="Apply fixes based on review:\\n{{review_feedback}}\\n\\nOriginal code:\\n```\\n{{input.code}}\\n```",
302 depends_on=["human_approval"],
303 output_key="fixed_code",
304 retry_policy=RetryPolicy.EXPONENTIAL,
305 max_retries=3,
306 ),
307 WorkflowStep(
308 name="test",
309 step_type=StepType.TOOL,
310 tool_name="run_tests",
311 depends_on=["apply_fixes"],
312 ),
313 ],
314 )
317_init_builtins()