Coverage for agentos/subagent/manager.py: 33%

133 statements  

« prev     ^ index     » next       coverage.py v7.14.3, created at 2026-07-03 17:32 +0800

1""" 

2子Agent管理 — Fork隔离 + Swarm并行 + A2A委派 + 父子通信。 

3基因来源: Claude Code (Fork) + Cursor (Swarm) 

4v1.3.15: +Parent-Child 通信(状态共享、心跳、生命周期) 

5""" 

6 

7from __future__ import annotations 

8 

9import asyncio 

10import time 

11import uuid 

12from dataclasses import dataclass, field 

13from enum import Enum 

14from typing import Any, Callable, Awaitable 

15 

16from .parent_child import ( 

17 ChildStatus, 

18 ChildHeartbeat, 

19 ChildInfo, 

20 SharedState, 

21 ChildContext, 

22 ChildHandle, 

23) 

24 

25 

26class SubAgentMode(str, Enum): 

27 """子 Agent 模式枚举。""" 

28 FORK = "fork" 

29 SWARM = "swarm" 

30 A2A = "a2a" 

31 

32 

33@dataclass 

34class SubAgentSpec: 

35 """子 Agent 规格。""" 

36 id: str = field(default_factory=lambda: uuid.uuid4().hex[:8]) 

37 task: str = "" 

38 mode: SubAgentMode = SubAgentMode.FORK 

39 model: str = "kimi-k2.6" 

40 max_iterations: int = 50 

41 timeout: float | None = None 

42 heartbeat_interval: float = 2.0 

43 

44 

45@dataclass 

46class SubAgentResult: 

47 """子 Agent 执行结果。""" 

48 agent_id: str 

49 output: str 

50 iterations: int 

51 error: str | None = None 

52 handle: ChildHandle | None = None 

53 

54 def summarize(self) -> str: 

55 if self.error: 

56 return f"[SubAgent {self.agent_id}] FAILED: {self.error}" 

57 return ( 

58 f"[SubAgent {self.agent_id}] Completed in {self.iterations} steps.\n" 

59 f"Result: {self.output[:500]}" 

60 ) 

61 

62 

63class SubAgentManager: 

64 """子Agent管理器 — Fork/Swarm/A2A + 父子通信。 

65 

66 用法:: 

67 

68 mgr = SubAgentManager() 

69 

70 # Fork 模式 

71 result = await mgr.spawn_fork("分析这份报告") 

72 

73 # Swarm 模式 

74 results = await mgr.spawn_swarm(["任务A", "任务B"]) 

75 

76 # 管控子Agent 

77 handle = mgr.get_handle(result.agent_id) 

78 await handle.pause() 

79 await handle.resume() 

80 await handle.cancel() 

81 status = handle.get_status() 

82 """ 

83 

84 MAX_SWARM_SIZE = 8 

85 

86 def __init__(self): 

87 self._agents: dict[str, ChildHandle] = {} 

88 self._shared_state = SharedState() # 全局共享状态 

89 

90 @property 

91 def shared_state(self) -> SharedState: 

92 """全局父子共享状态。""" 

93 return self._shared_state 

94 

95 @property 

96 def active_children(self) -> int: 

97 """当前活跃的子Agent数。""" 

98 return sum( 

99 1 for h in self._agents.values() 

100 if h.status in (ChildStatus.RUNNING, ChildStatus.PAUSED) 

101 ) 

102 

103 def get_handle(self, agent_id: str) -> ChildHandle | None: 

104 """根据 agent_id 获取子Agent句柄。""" 

105 return self._agents.get(agent_id) 

106 

107 def list_children(self) -> list[dict[str, Any]]: 

108 """列出所有子Agent状态。""" 

109 return [h.get_status() for h in self._agents.values()] 

110 

111 async def cancel_all(self) -> None: 

112 """取消所有子Agent。""" 

113 tasks = [h.cancel() for h in self._agents.values()] 

114 await asyncio.gather(*tasks) 

115 

116 async def spawn_fork( 

117 self, 

118 task: str, 

119 model: str = "kimi-k2.6", 

120 run_func: Callable[[SubAgentSpec, ChildContext], Awaitable[tuple[str, int]]] | None = None, 

121 timeout: float | None = None, 

122 heartbeat_interval: float = 2.0, 

123 ) -> SubAgentResult: 

124 """Fork模式:子Agent在干净上下文中运行,父只拿摘要。 

125 

126 run_func(spec, ctx) -> (output, iterations) 

127 """ 

128 spec = SubAgentSpec( 

129 task=task, 

130 mode=SubAgentMode.FORK, 

131 model=model, 

132 timeout=timeout, 

133 heartbeat_interval=heartbeat_interval, 

134 ) 

135 

136 handle = ChildHandle( 

137 agent_id=spec.id, 

138 task=task, 

139 mode=spec.mode.value, 

140 timeout=timeout, 

141 heartbeat_interval=heartbeat_interval, 

142 ) 

143 self._agents[spec.id] = handle 

144 ctx = handle.create_context() 

145 handle.info.status = ChildStatus.RUNNING 

146 

147 if run_func: 

148 try: 

149 output, iterations = await run_func(spec, ctx) 

150 if handle._cancel_flag: 

151 handle.info.status = ChildStatus.CANCELLED 

152 return SubAgentResult( 

153 agent_id=spec.id, 

154 output=output, 

155 iterations=iterations, 

156 error="Cancelled by parent", 

157 handle=handle, 

158 ) 

159 await ctx.done(output) 

160 handle.info.output = output 

161 handle.info.iterations = iterations 

162 handle.info.status = ChildStatus.COMPLETED 

163 return SubAgentResult( 

164 agent_id=spec.id, 

165 output=output, 

166 iterations=iterations, 

167 handle=handle, 

168 ) 

169 except asyncio.CancelledError: 

170 handle.info.status = ChildStatus.CANCELLED 

171 return SubAgentResult( 

172 agent_id=spec.id, 

173 output="", 

174 iterations=handle.info.iterations, 

175 error="Cancelled by parent", 

176 handle=handle, 

177 ) 

178 except Exception as e: 

179 await ctx.fail(str(e)) 

180 handle.info.status = ChildStatus.FAILED 

181 handle.info.error = str(e) 

182 return SubAgentResult( 

183 agent_id=spec.id, 

184 output="", 

185 iterations=handle.info.iterations, 

186 error=str(e), 

187 handle=handle, 

188 ) 

189 

190 handle.info.status = ChildStatus.COMPLETED 

191 return SubAgentResult( 

192 agent_id=spec.id, 

193 output=f"Fork agent would process: {task}", 

194 iterations=0, 

195 handle=handle, 

196 ) 

197 

198 async def spawn_swarm( 

199 self, 

200 tasks: list[str], 

201 model: str = "kimi-k2.6", 

202 run_func: Callable[[SubAgentSpec, ChildContext], Awaitable[tuple[str, int]]] | None = None, 

203 timeout: float | None = None, 

204 heartbeat_interval: float = 2.0, 

205 ) -> list[SubAgentResult]: 

206 """Swarm模式:最多8个Agent并行处理。""" 

207 agents = [] 

208 for i, task in enumerate(tasks[:self.MAX_SWARM_SIZE]): 

209 spec = SubAgentSpec( 

210 task=task, 

211 mode=SubAgentMode.SWARM, 

212 model=model, 

213 timeout=timeout, 

214 heartbeat_interval=heartbeat_interval, 

215 ) 

216 agents.append(spec) 

217 

218 async def run_one(spec: SubAgentSpec) -> SubAgentResult: 

219 handle = ChildHandle( 

220 agent_id=spec.id, 

221 task=spec.task, 

222 mode=spec.mode.value, 

223 timeout=timeout, 

224 heartbeat_interval=heartbeat_interval, 

225 ) 

226 self._agents[spec.id] = handle 

227 ctx = handle.create_context() 

228 handle.info.status = ChildStatus.RUNNING 

229 

230 if run_func: 

231 try: 

232 output, iterations = await run_func(spec, ctx) 

233 if handle._cancel_flag: 

234 handle.info.status = ChildStatus.CANCELLED 

235 return SubAgentResult( 

236 agent_id=spec.id, 

237 output=output, 

238 iterations=iterations, 

239 error="Cancelled by parent", 

240 handle=handle, 

241 ) 

242 await ctx.done(output) 

243 handle.info.output = output 

244 handle.info.iterations = iterations 

245 handle.info.status = ChildStatus.COMPLETED 

246 return SubAgentResult( 

247 agent_id=spec.id, 

248 output=output, 

249 iterations=iterations, 

250 handle=handle, 

251 ) 

252 except Exception as e: 

253 await ctx.fail(str(e)) 

254 handle.info.status = ChildStatus.FAILED 

255 handle.info.error = str(e) 

256 return SubAgentResult( 

257 agent_id=spec.id, 

258 output="", 

259 iterations=handle.info.iterations, 

260 error=str(e), 

261 handle=handle, 

262 ) 

263 

264 handle.info.status = ChildStatus.COMPLETED 

265 return SubAgentResult( 

266 agent_id=spec.id, 

267 output=f"Swarm agent would process: {spec.task}", 

268 iterations=0, 

269 handle=handle, 

270 ) 

271 

272 return await asyncio.gather(*[run_one(a) for a in agents]) 

273 

274 def split_task(self, task: str) -> list[str]: 

275 """将复杂任务拆分为子任务。""" 

276 if "\n" in task: 

277 return [t.strip() for t in task.split("\n") if t.strip()] 

278 return [task] 

279 

280 async def monitor_heartbeats(self, interval: float = 1.0) -> None: 

281 """后台心跳监控协程,检测超时和失联子Agent。 

282 

283 用法:: 

284 

285 asyncio.create_task(mgr.monitor_heartbeats()) 

286 """ 

287 while True: 

288 await asyncio.sleep(interval) 

289 for agent_id, handle in list(self._agents.items()): 

290 running = handle.status in (ChildStatus.RUNNING, ChildStatus.PAUSED) 

291 if running and handle.check_timeout(): 

292 await handle.cancel() 

293 handle.info.status = ChildStatus.TIMEOUT 

294 handle.info.error = f"Timeout after {handle.info.timeout}s" 

295 elif running and handle.check_heartbeat_timeout(): 

296 handle.info.status = ChildStatus.FAILED 

297 handle.info.error = "Heartbeat lost — child agent unresponsive" 

298 

299 async def cleanup(self, max_age_seconds: float = 3600.0) -> int: 

300 """清理已完成/失败/取消且超过 max_age_seconds 的句柄。返回清理数。""" 

301 now = time.time() 

302 cleaned = 0 

303 terminal = (ChildStatus.COMPLETED, ChildStatus.FAILED, 

304 ChildStatus.CANCELLED, ChildStatus.TIMEOUT) 

305 for agent_id, handle in list(self._agents.items()): 

306 if handle.status in terminal: 

307 age = now - handle.info.spawned_at 

308 if age > max_age_seconds: 

309 del self._agents[agent_id] 

310 cleaned += 1 

311 return cleaned