Coverage for agentos/subagent/manager.py: 33%
133 statements
« prev ^ index » next coverage.py v7.14.3, created at 2026-07-02 09:59 +0800
« prev ^ index » next coverage.py v7.14.3, created at 2026-07-02 09:59 +0800
1"""
2子Agent管理 — Fork隔离 + Swarm并行 + A2A委派 + 父子通信。
3基因来源: Claude Code (Fork) + Cursor (Swarm)
4v1.3.15: +Parent-Child 通信(状态共享、心跳、生命周期)
5"""
7from __future__ import annotations
9import asyncio
10import time
11import uuid
12from dataclasses import dataclass, field
13from enum import Enum
14from typing import Any, Callable, Awaitable
16from .parent_child import (
17 ChildStatus,
18 ChildHeartbeat,
19 ChildInfo,
20 SharedState,
21 ChildContext,
22 ChildHandle,
23)
26class SubAgentMode(str, Enum):
27 """子 Agent 模式枚举。"""
28 FORK = "fork"
29 SWARM = "swarm"
30 A2A = "a2a"
33@dataclass
34class SubAgentSpec:
35 """子 Agent 规格。"""
36 id: str = field(default_factory=lambda: uuid.uuid4().hex[:8])
37 task: str = ""
38 mode: SubAgentMode = SubAgentMode.FORK
39 model: str = "kimi-k2.6"
40 max_iterations: int = 50
41 timeout: float | None = None
42 heartbeat_interval: float = 2.0
45@dataclass
46class SubAgentResult:
47 """子 Agent 执行结果。"""
48 agent_id: str
49 output: str
50 iterations: int
51 error: str | None = None
52 handle: ChildHandle | None = None
54 def summarize(self) -> str:
55 if self.error:
56 return f"[SubAgent {self.agent_id}] FAILED: {self.error}"
57 return (
58 f"[SubAgent {self.agent_id}] Completed in {self.iterations} steps.\n"
59 f"Result: {self.output[:500]}"
60 )
63class SubAgentManager:
64 """子Agent管理器 — Fork/Swarm/A2A + 父子通信。
66 用法::
68 mgr = SubAgentManager()
70 # Fork 模式
71 result = await mgr.spawn_fork("分析这份报告")
73 # Swarm 模式
74 results = await mgr.spawn_swarm(["任务A", "任务B"])
76 # 管控子Agent
77 handle = mgr.get_handle(result.agent_id)
78 await handle.pause()
79 await handle.resume()
80 await handle.cancel()
81 status = handle.get_status()
82 """
84 MAX_SWARM_SIZE = 8
86 def __init__(self):
87 self._agents: dict[str, ChildHandle] = {}
88 self._shared_state = SharedState() # 全局共享状态
90 @property
91 def shared_state(self) -> SharedState:
92 """全局父子共享状态。"""
93 return self._shared_state
95 @property
96 def active_children(self) -> int:
97 """当前活跃的子Agent数。"""
98 return sum(
99 1 for h in self._agents.values()
100 if h.status in (ChildStatus.RUNNING, ChildStatus.PAUSED)
101 )
103 def get_handle(self, agent_id: str) -> ChildHandle | None:
104 """根据 agent_id 获取子Agent句柄。"""
105 return self._agents.get(agent_id)
107 def list_children(self) -> list[dict[str, Any]]:
108 """列出所有子Agent状态。"""
109 return [h.get_status() for h in self._agents.values()]
111 async def cancel_all(self) -> None:
112 """取消所有子Agent。"""
113 tasks = [h.cancel() for h in self._agents.values()]
114 await asyncio.gather(*tasks)
116 async def spawn_fork(
117 self,
118 task: str,
119 model: str = "kimi-k2.6",
120 run_func: Callable[[SubAgentSpec, ChildContext], Awaitable[tuple[str, int]]] | None = None,
121 timeout: float | None = None,
122 heartbeat_interval: float = 2.0,
123 ) -> SubAgentResult:
124 """Fork模式:子Agent在干净上下文中运行,父只拿摘要。
126 run_func(spec, ctx) -> (output, iterations)
127 """
128 spec = SubAgentSpec(
129 task=task,
130 mode=SubAgentMode.FORK,
131 model=model,
132 timeout=timeout,
133 heartbeat_interval=heartbeat_interval,
134 )
136 handle = ChildHandle(
137 agent_id=spec.id,
138 task=task,
139 mode=spec.mode.value,
140 timeout=timeout,
141 heartbeat_interval=heartbeat_interval,
142 )
143 self._agents[spec.id] = handle
144 ctx = handle.create_context()
145 handle.info.status = ChildStatus.RUNNING
147 if run_func:
148 try:
149 output, iterations = await run_func(spec, ctx)
150 if handle._cancel_flag:
151 handle.info.status = ChildStatus.CANCELLED
152 return SubAgentResult(
153 agent_id=spec.id,
154 output=output,
155 iterations=iterations,
156 error="Cancelled by parent",
157 handle=handle,
158 )
159 await ctx.done(output)
160 handle.info.output = output
161 handle.info.iterations = iterations
162 handle.info.status = ChildStatus.COMPLETED
163 return SubAgentResult(
164 agent_id=spec.id,
165 output=output,
166 iterations=iterations,
167 handle=handle,
168 )
169 except asyncio.CancelledError:
170 handle.info.status = ChildStatus.CANCELLED
171 return SubAgentResult(
172 agent_id=spec.id,
173 output="",
174 iterations=handle.info.iterations,
175 error="Cancelled by parent",
176 handle=handle,
177 )
178 except Exception as e:
179 await ctx.fail(str(e))
180 handle.info.status = ChildStatus.FAILED
181 handle.info.error = str(e)
182 return SubAgentResult(
183 agent_id=spec.id,
184 output="",
185 iterations=handle.info.iterations,
186 error=str(e),
187 handle=handle,
188 )
190 handle.info.status = ChildStatus.COMPLETED
191 return SubAgentResult(
192 agent_id=spec.id,
193 output=f"Fork agent would process: {task}",
194 iterations=0,
195 handle=handle,
196 )
198 async def spawn_swarm(
199 self,
200 tasks: list[str],
201 model: str = "kimi-k2.6",
202 run_func: Callable[[SubAgentSpec, ChildContext], Awaitable[tuple[str, int]]] | None = None,
203 timeout: float | None = None,
204 heartbeat_interval: float = 2.0,
205 ) -> list[SubAgentResult]:
206 """Swarm模式:最多8个Agent并行处理。"""
207 agents = []
208 for i, task in enumerate(tasks[:self.MAX_SWARM_SIZE]):
209 spec = SubAgentSpec(
210 task=task,
211 mode=SubAgentMode.SWARM,
212 model=model,
213 timeout=timeout,
214 heartbeat_interval=heartbeat_interval,
215 )
216 agents.append(spec)
218 async def run_one(spec: SubAgentSpec) -> SubAgentResult:
219 handle = ChildHandle(
220 agent_id=spec.id,
221 task=spec.task,
222 mode=spec.mode.value,
223 timeout=timeout,
224 heartbeat_interval=heartbeat_interval,
225 )
226 self._agents[spec.id] = handle
227 ctx = handle.create_context()
228 handle.info.status = ChildStatus.RUNNING
230 if run_func:
231 try:
232 output, iterations = await run_func(spec, ctx)
233 if handle._cancel_flag:
234 handle.info.status = ChildStatus.CANCELLED
235 return SubAgentResult(
236 agent_id=spec.id,
237 output=output,
238 iterations=iterations,
239 error="Cancelled by parent",
240 handle=handle,
241 )
242 await ctx.done(output)
243 handle.info.output = output
244 handle.info.iterations = iterations
245 handle.info.status = ChildStatus.COMPLETED
246 return SubAgentResult(
247 agent_id=spec.id,
248 output=output,
249 iterations=iterations,
250 handle=handle,
251 )
252 except Exception as e:
253 await ctx.fail(str(e))
254 handle.info.status = ChildStatus.FAILED
255 handle.info.error = str(e)
256 return SubAgentResult(
257 agent_id=spec.id,
258 output="",
259 iterations=handle.info.iterations,
260 error=str(e),
261 handle=handle,
262 )
264 handle.info.status = ChildStatus.COMPLETED
265 return SubAgentResult(
266 agent_id=spec.id,
267 output=f"Swarm agent would process: {spec.task}",
268 iterations=0,
269 handle=handle,
270 )
272 return await asyncio.gather(*[run_one(a) for a in agents])
274 def split_task(self, task: str) -> list[str]:
275 """将复杂任务拆分为子任务。"""
276 if "\n" in task:
277 return [t.strip() for t in task.split("\n") if t.strip()]
278 return [task]
280 async def monitor_heartbeats(self, interval: float = 1.0) -> None:
281 """后台心跳监控协程,检测超时和失联子Agent。
283 用法::
285 asyncio.create_task(mgr.monitor_heartbeats())
286 """
287 while True:
288 await asyncio.sleep(interval)
289 for agent_id, handle in list(self._agents.items()):
290 running = handle.status in (ChildStatus.RUNNING, ChildStatus.PAUSED)
291 if running and handle.check_timeout():
292 await handle.cancel()
293 handle.info.status = ChildStatus.TIMEOUT
294 handle.info.error = f"Timeout after {handle.info.timeout}s"
295 elif running and handle.check_heartbeat_timeout():
296 handle.info.status = ChildStatus.FAILED
297 handle.info.error = "Heartbeat lost — child agent unresponsive"
299 async def cleanup(self, max_age_seconds: float = 3600.0) -> int:
300 """清理已完成/失败/取消且超过 max_age_seconds 的句柄。返回清理数。"""
301 now = time.time()
302 cleaned = 0
303 terminal = (ChildStatus.COMPLETED, ChildStatus.FAILED,
304 ChildStatus.CANCELLED, ChildStatus.TIMEOUT)
305 for agent_id, handle in list(self._agents.items()):
306 if handle.status in terminal:
307 age = now - handle.info.spawned_at
308 if age > max_age_seconds:
309 del self._agents[agent_id]
310 cleaned += 1
311 return cleaned