Coverage for agentos/swarm/task_decomposer.py: 30%
86 statements
« prev ^ index » next coverage.py v7.14.3, created at 2026-07-02 09:59 +0800
« prev ^ index » next coverage.py v7.14.3, created at 2026-07-02 09:59 +0800
1"""
2v1.9.4: LLM-driven Task Decomposer.
4Splits complex tasks into sub-task DAGs with dependencies,
5assigning each sub-task to appropriate agent roles.
6"""
8from __future__ import annotations
10from dataclasses import dataclass, field
11from typing import Any, Optional
13import json as _json
16_DECOMPOSE_PROMPT = """You are a task decomposition expert. Given a complex task, break it into
17a sequence of sub-tasks that can be executed independently or sequentially.
19Input task: {task}
20Available agents: {agents}
22Output a JSON array of sub-tasks. Each sub-task must have:
23- "id": unique short string (e.g. "step_1")
24- "title": human-readable title
25- "description": what this sub-task should accomplish
26- "depends_on": list of sub-task IDs that must complete before this one (empty list if none)
27- "agent_hint": which agent role is best suited (from the available list, or "any")
28- "expected_output": brief description of expected result
30Rules:
311. First sub-tasks should have no dependencies
322. Each sub-task should be independently executable
333. Use at most {max_depth} levels of nesting
344. Sub-tasks should be concrete and actionable
36Output ONLY the JSON array, no other text.
37JSON:"""
40@dataclass
41class SubTask:
42 """A single sub-task in the decomposition DAG."""
44 id: str
45 title: str
46 description: str
47 depends_on: list[str] = field(default_factory=list)
48 agent_hint: str = "any"
49 expected_output: str = ""
50 status: str = "pending" # pending | running | done | failed
51 output: Any = None
53 def to_dict(self) -> dict:
54 return {
55 "id": self.id,
56 "title": self.title,
57 "description": self.description,
58 "depends_on": self.depends_on,
59 "agent_hint": self.agent_hint,
60 "expected_output": self.expected_output,
61 "status": self.status,
62 }
65@dataclass
66class Decomposition:
67 """Result of task decomposition."""
69 original_task: str
70 sub_tasks: list[SubTask] = field(default_factory=list)
71 total_steps: int = 0
74class TaskDecomposer:
75 """LLM-driven task decomposition engine.
77 Breaks complex tasks into executable sub-task DAGs.
78 """
80 def __init__(self, max_depth: int = 4, llm_model: str = "gpt-4o-mini"):
81 self.max_depth = max_depth
82 self._llm_model = llm_model
84 def decompose(
85 self,
86 task: str,
87 agents: list[str] | None = None,
88 ) -> Decomposition:
89 """Decompose a complex task into sub-tasks.
91 Args:
92 task: The full task description
93 agents: List of available agent names for role assignment
95 Returns:
96 Decomposition with ordered sub-tasks
97 """
98 agents_list = agents or ["general"]
99 agent_str = ", ".join(agents_list)
101 prompt = _DECOMPOSE_PROMPT.format(
102 task=task,
103 agents=agent_str,
104 max_depth=self.max_depth,
105 )
107 # Try LLM-based decomposition first, fall back to rule-based
108 result = self._llm_decompose(prompt)
109 if result:
110 return result
112 return self._fallback_decompose(task, agents_list)
114 def _llm_decompose(self, prompt: str) -> Decomposition | None:
115 """Use LLM to decompose task. Returns None on failure."""
116 try:
117 import os
118 api_key = os.environ.get("OPENAI_API_KEY", "")
119 if not api_key:
120 return None
122 import requests
123 resp = requests.post(
124 "https://api.openai.com/v1/chat/completions",
125 headers={"Authorization": f"Bearer {api_key}",
126 "Content-Type": "application/json"},
127 json={
128 "model": self._llm_model,
129 "messages": [{"role": "user", "content": prompt}],
130 "temperature": 0.2,
131 "max_tokens": 1000,
132 },
133 timeout=30,
134 )
135 if resp.status_code != 200:
136 return None
138 text = resp.json()["choices"][0]["message"]["content"]
140 # Extract JSON from response
141 start = text.find("[")
142 end = text.rfind("]") + 1
143 if start == -1 or end == 0:
144 return None
146 data = _json.loads(text[start:end])
147 sub_tasks = []
148 for item in data:
149 st = SubTask(
150 id=item.get("id", f"step_{len(sub_tasks)+1}"),
151 title=item.get("title", ""),
152 description=item.get("description", ""),
153 depends_on=item.get("depends_on", []),
154 agent_hint=item.get("agent_hint", "any"),
155 expected_output=item.get("expected_output", ""),
156 )
157 sub_tasks.append(st)
159 return Decomposition(
160 original_task=prompt,
161 sub_tasks=sub_tasks,
162 total_steps=len(sub_tasks),
163 )
164 except Exception:
165 return None
167 def _fallback_decompose(
168 self, task: str, agents: list[str]
169 ) -> Decomposition:
170 """Rule-based fallback when LLM unavailable.
172 Splits on explicit delimiters ('then', 'after', numbered steps)
173 or uses keyword-based phase decomposition.
174 """
175 import re
177 # Try to split on explicit markers
178 markers = re.split(
179 r'(?:Step\s*\d+[.:]\s*|\d+\)\s*|(?:then|之后|然后|接着)[,,\s]*|;\s*)',
180 task, flags=re.IGNORECASE,
181 )
182 markers = [m.strip() for m in markers if m.strip()]
184 if len(markers) > 1:
185 sub_tasks = []
186 for i, desc in enumerate(markers):
187 st = SubTask(
188 id=f"step_{i+1}",
189 title=desc[:50],
190 description=desc,
191 depends_on=[f"step_{i}"] if i > 0 else [],
192 agent_hint=agents[0] if agents else "any",
193 )
194 sub_tasks.append(st)
195 return Decomposition(
196 original_task=task,
197 sub_tasks=sub_tasks,
198 total_steps=len(sub_tasks),
199 )
201 # Single task — keyword-based phase decomposition
202 phases = []
203 task_lower = task.lower()
205 if any(k in task_lower for k in ("search", "find", "search for", "搜索", "查找")):
206 phases.append(("search", "Search and gather information"))
207 if any(k in task_lower for k in ("analyze", "analysis", "分析", "处理")):
208 phases.append(("analyze", "Analyze collected information"))
209 if any(k in task_lower for k in ("write", "generate", "create", "写", "生成", "创建")):
210 phases.append(("generate", "Generate final output"))
211 if any(k in task_lower for k in ("code", "implement", "build", "代码", "实现", "开发")):
212 phases.append(("implement", "Implement the solution"))
213 if any(k in task_lower for k in ("test", "verify", "validate", "测试", "验证")):
214 phases.append(("verify", "Verify and validate results"))
216 if not phases:
217 phases = [("execute", task)]
219 sub_tasks = []
220 for i, (pid, desc) in enumerate(phases):
221 st = SubTask(
222 id=f"phase_{i+1}_{pid}",
223 title=pid.capitalize(),
224 description=desc,
225 depends_on=[sub_tasks[-1].id] if sub_tasks else [],
226 agent_hint=agents[0] if agents else "any",
227 )
228 sub_tasks.append(st)
230 return Decomposition(
231 original_task=task,
232 sub_tasks=sub_tasks,
233 total_steps=len(sub_tasks),
234 )