Coverage for agentos/swarm/task_decomposer.py: 30%

86 statements  

« prev     ^ index     » next       coverage.py v7.14.3, created at 2026-07-02 09:59 +0800

1""" 

2v1.9.4: LLM-driven Task Decomposer. 

3 

4Splits complex tasks into sub-task DAGs with dependencies, 

5assigning each sub-task to appropriate agent roles. 

6""" 

7 

8from __future__ import annotations 

9 

10from dataclasses import dataclass, field 

11from typing import Any, Optional 

12 

13import json as _json 

14 

15 

16_DECOMPOSE_PROMPT = """You are a task decomposition expert. Given a complex task, break it into 

17a sequence of sub-tasks that can be executed independently or sequentially. 

18 

19Input task: {task} 

20Available agents: {agents} 

21 

22Output a JSON array of sub-tasks. Each sub-task must have: 

23- "id": unique short string (e.g. "step_1") 

24- "title": human-readable title 

25- "description": what this sub-task should accomplish 

26- "depends_on": list of sub-task IDs that must complete before this one (empty list if none) 

27- "agent_hint": which agent role is best suited (from the available list, or "any") 

28- "expected_output": brief description of expected result 

29 

30Rules: 

311. First sub-tasks should have no dependencies 

322. Each sub-task should be independently executable 

333. Use at most {max_depth} levels of nesting 

344. Sub-tasks should be concrete and actionable 

35 

36Output ONLY the JSON array, no other text. 

37JSON:""" 

38 

39 

40@dataclass 

41class SubTask: 

42 """A single sub-task in the decomposition DAG.""" 

43 

44 id: str 

45 title: str 

46 description: str 

47 depends_on: list[str] = field(default_factory=list) 

48 agent_hint: str = "any" 

49 expected_output: str = "" 

50 status: str = "pending" # pending | running | done | failed 

51 output: Any = None 

52 

53 def to_dict(self) -> dict: 

54 return { 

55 "id": self.id, 

56 "title": self.title, 

57 "description": self.description, 

58 "depends_on": self.depends_on, 

59 "agent_hint": self.agent_hint, 

60 "expected_output": self.expected_output, 

61 "status": self.status, 

62 } 

63 

64 

65@dataclass 

66class Decomposition: 

67 """Result of task decomposition.""" 

68 

69 original_task: str 

70 sub_tasks: list[SubTask] = field(default_factory=list) 

71 total_steps: int = 0 

72 

73 

74class TaskDecomposer: 

75 """LLM-driven task decomposition engine. 

76 

77 Breaks complex tasks into executable sub-task DAGs. 

78 """ 

79 

80 def __init__(self, max_depth: int = 4, llm_model: str = "gpt-4o-mini"): 

81 self.max_depth = max_depth 

82 self._llm_model = llm_model 

83 

84 def decompose( 

85 self, 

86 task: str, 

87 agents: list[str] | None = None, 

88 ) -> Decomposition: 

89 """Decompose a complex task into sub-tasks. 

90 

91 Args: 

92 task: The full task description 

93 agents: List of available agent names for role assignment 

94 

95 Returns: 

96 Decomposition with ordered sub-tasks 

97 """ 

98 agents_list = agents or ["general"] 

99 agent_str = ", ".join(agents_list) 

100 

101 prompt = _DECOMPOSE_PROMPT.format( 

102 task=task, 

103 agents=agent_str, 

104 max_depth=self.max_depth, 

105 ) 

106 

107 # Try LLM-based decomposition first, fall back to rule-based 

108 result = self._llm_decompose(prompt) 

109 if result: 

110 return result 

111 

112 return self._fallback_decompose(task, agents_list) 

113 

114 def _llm_decompose(self, prompt: str) -> Decomposition | None: 

115 """Use LLM to decompose task. Returns None on failure.""" 

116 try: 

117 import os 

118 api_key = os.environ.get("OPENAI_API_KEY", "") 

119 if not api_key: 

120 return None 

121 

122 import requests 

123 resp = requests.post( 

124 "https://api.openai.com/v1/chat/completions", 

125 headers={"Authorization": f"Bearer {api_key}", 

126 "Content-Type": "application/json"}, 

127 json={ 

128 "model": self._llm_model, 

129 "messages": [{"role": "user", "content": prompt}], 

130 "temperature": 0.2, 

131 "max_tokens": 1000, 

132 }, 

133 timeout=30, 

134 ) 

135 if resp.status_code != 200: 

136 return None 

137 

138 text = resp.json()["choices"][0]["message"]["content"] 

139 

140 # Extract JSON from response 

141 start = text.find("[") 

142 end = text.rfind("]") + 1 

143 if start == -1 or end == 0: 

144 return None 

145 

146 data = _json.loads(text[start:end]) 

147 sub_tasks = [] 

148 for item in data: 

149 st = SubTask( 

150 id=item.get("id", f"step_{len(sub_tasks)+1}"), 

151 title=item.get("title", ""), 

152 description=item.get("description", ""), 

153 depends_on=item.get("depends_on", []), 

154 agent_hint=item.get("agent_hint", "any"), 

155 expected_output=item.get("expected_output", ""), 

156 ) 

157 sub_tasks.append(st) 

158 

159 return Decomposition( 

160 original_task=prompt, 

161 sub_tasks=sub_tasks, 

162 total_steps=len(sub_tasks), 

163 ) 

164 except Exception: 

165 return None 

166 

167 def _fallback_decompose( 

168 self, task: str, agents: list[str] 

169 ) -> Decomposition: 

170 """Rule-based fallback when LLM unavailable. 

171 

172 Splits on explicit delimiters ('then', 'after', numbered steps) 

173 or uses keyword-based phase decomposition. 

174 """ 

175 import re 

176 

177 # Try to split on explicit markers 

178 markers = re.split( 

179 r'(?:Step\s*\d+[.:]\s*|\d+\)\s*|(?:then|之后|然后|接着)[,,\s]*|;\s*)', 

180 task, flags=re.IGNORECASE, 

181 ) 

182 markers = [m.strip() for m in markers if m.strip()] 

183 

184 if len(markers) > 1: 

185 sub_tasks = [] 

186 for i, desc in enumerate(markers): 

187 st = SubTask( 

188 id=f"step_{i+1}", 

189 title=desc[:50], 

190 description=desc, 

191 depends_on=[f"step_{i}"] if i > 0 else [], 

192 agent_hint=agents[0] if agents else "any", 

193 ) 

194 sub_tasks.append(st) 

195 return Decomposition( 

196 original_task=task, 

197 sub_tasks=sub_tasks, 

198 total_steps=len(sub_tasks), 

199 ) 

200 

201 # Single task — keyword-based phase decomposition 

202 phases = [] 

203 task_lower = task.lower() 

204 

205 if any(k in task_lower for k in ("search", "find", "search for", "搜索", "查找")): 

206 phases.append(("search", "Search and gather information")) 

207 if any(k in task_lower for k in ("analyze", "analysis", "分析", "处理")): 

208 phases.append(("analyze", "Analyze collected information")) 

209 if any(k in task_lower for k in ("write", "generate", "create", "写", "生成", "创建")): 

210 phases.append(("generate", "Generate final output")) 

211 if any(k in task_lower for k in ("code", "implement", "build", "代码", "实现", "开发")): 

212 phases.append(("implement", "Implement the solution")) 

213 if any(k in task_lower for k in ("test", "verify", "validate", "测试", "验证")): 

214 phases.append(("verify", "Verify and validate results")) 

215 

216 if not phases: 

217 phases = [("execute", task)] 

218 

219 sub_tasks = [] 

220 for i, (pid, desc) in enumerate(phases): 

221 st = SubTask( 

222 id=f"phase_{i+1}_{pid}", 

223 title=pid.capitalize(), 

224 description=desc, 

225 depends_on=[sub_tasks[-1].id] if sub_tasks else [], 

226 agent_hint=agents[0] if agents else "any", 

227 ) 

228 sub_tasks.append(st) 

229 

230 return Decomposition( 

231 original_task=task, 

232 sub_tasks=sub_tasks, 

233 total_steps=len(sub_tasks), 

234 )