Coverage for agentos/subagent/parent_child.py: 47%

163 statements  

« prev     ^ index     » next       coverage.py v7.14.3, created at 2026-07-03 17:32 +0800

1""" 

2子Agent父子通信 — 状态共享、心跳、生命周期管理。 

3父Agent通过 ChildHandle 管控子Agent;子Agent通过 ChildContext 向父Agent报告。 

4""" 

5 

6from __future__ import annotations 

7 

8import asyncio 

9import time 

10import uuid 

11from dataclasses import dataclass, field 

12from enum import Enum 

13from typing import Any, Callable, Awaitable 

14 

15 

16class ChildStatus(str, Enum): 

17 """子Agent运行状态。""" 

18 IDLE = "idle" 

19 RUNNING = "running" 

20 PAUSED = "paused" 

21 COMPLETED = "completed" 

22 FAILED = "failed" 

23 CANCELLED = "cancelled" 

24 TIMEOUT = "timeout" 

25 

26 

27@dataclass 

28class ChildHeartbeat: 

29 """子Agent心跳包。""" 

30 agent_id: str 

31 status: ChildStatus = ChildStatus.RUNNING 

32 progress: float = 0.0 # 0.0 ~ 1.0 

33 current_step: str = "" 

34 message: str = "" 

35 iteration: int = 0 

36 timestamp: float = field(default_factory=time.time) 

37 

38 

39@dataclass 

40class ChildInfo: 

41 """子Agent元信息(父Agent侧)。""" 

42 agent_id: str 

43 task: str 

44 mode: str 

45 status: ChildStatus = ChildStatus.IDLE 

46 spawned_at: float = field(default_factory=time.time) 

47 last_heartbeat: float = field(default_factory=time.time) 

48 heartbeat_interval: float = 2.0 # 期望心跳间隔(秒) 

49 timeout: float | None = None # 超时(秒),None=无超时 

50 progress: float = 0.0 

51 current_step: str = "" 

52 iterations: int = 0 

53 error: str | None = None 

54 output: str = "" 

55 

56 

57class SharedState: 

58 """父子共享状态(线程安全)。""" 

59 

60 def __init__(self): 

61 self._lock = asyncio.Lock() 

62 self._data: dict[str, Any] = {} 

63 

64 async def set(self, key: str, value: Any) -> None: 

65 async with self._lock: 

66 self._data[key] = value 

67 

68 async def get(self, key: str, default: Any = None) -> Any: 

69 async with self._lock: 

70 return self._data.get(key, default) 

71 

72 async def update(self, mapping: dict[str, Any]) -> None: 

73 async with self._lock: 

74 self._data.update(mapping) 

75 

76 async def snapshot(self) -> dict[str, Any]: 

77 async with self._lock: 

78 return dict(self._data) 

79 

80 def set_sync(self, key: str, value: Any) -> None: 

81 """同步写(非协程场景)。""" 

82 self._data[key] = value 

83 

84 def get_sync(self, key: str, default: Any = None) -> Any: 

85 """同步读(非协程场景)。""" 

86 return self._data.get(key, default) 

87 

88 

89class ChildContext: 

90 """子Agent视角 — 向父Agent报告状态、检查控制信号。""" 

91 

92 def __init__( 

93 self, 

94 agent_id: str, 

95 heartbeat_callback: Callable[[ChildHeartbeat], Awaitable[None]] | None = None, 

96 on_cancel: Callable[[], bool] | None = None, 

97 on_pause: Callable[[], Awaitable[None]] | None = None, 

98 shared_state: SharedState | None = None, 

99 ): 

100 self.agent_id = agent_id 

101 self._heartbeat_cb = heartbeat_callback 

102 self._cancel_check = on_cancel or (lambda: False) 

103 self._pause_cb = on_pause or (lambda: asyncio.sleep(0)) 

104 self.shared_state = shared_state or SharedState() 

105 self._cancelled = False 

106 self._paused = False 

107 self._progress = 0.0 

108 self._current_step = "" 

109 self._iteration = 0 

110 

111 @property 

112 def cancelled(self) -> bool: 

113 return self._cancelled 

114 

115 @property 

116 def paused(self) -> bool: 

117 return self._paused 

118 

119 @property 

120 def progress(self) -> float: 

121 return self._progress 

122 

123 async def report_progress( 

124 self, 

125 progress: float, 

126 step: str = "", 

127 message: str = "", 

128 ) -> None: 

129 """子Agent报告进度。""" 

130 self._progress = max(0.0, min(1.0, progress)) 

131 self._current_step = step 

132 if self._heartbeat_cb: 

133 await self._heartbeat_cb(ChildHeartbeat( 

134 agent_id=self.agent_id, 

135 status=ChildStatus.RUNNING, 

136 progress=self._progress, 

137 current_step=step, 

138 message=message, 

139 iteration=self._iteration, 

140 )) 

141 

142 async def step(self, iteration: int, step: str = "") -> None: 

143 """子Agent标记一个执行步。""" 

144 self._iteration = iteration 

145 self._current_step = step 

146 

147 async def check_control(self) -> ChildStatus: 

148 """检查父Agent控制信号,返回应执行的操作。""" 

149 if self._cancel_check(): 

150 self._cancelled = True 

151 return ChildStatus.CANCELLED 

152 if self._paused: 

153 await self._pause_cb() 

154 return ChildStatus.PAUSED 

155 return ChildStatus.RUNNING 

156 

157 async def send_heartbeat(self, message: str = "") -> None: 

158 """子Agent发送心跳。""" 

159 if self._heartbeat_cb: 

160 await self._heartbeat_cb(ChildHeartbeat( 

161 agent_id=self.agent_id, 

162 status=ChildStatus.RUNNING, 

163 progress=self._progress, 

164 current_step=self._current_step, 

165 message=message, 

166 iteration=self._iteration, 

167 )) 

168 

169 async def done(self, output: str = "") -> None: 

170 """子Agent标记完成。""" 

171 if self._heartbeat_cb: 

172 await self._heartbeat_cb(ChildHeartbeat( 

173 agent_id=self.agent_id, 

174 status=ChildStatus.COMPLETED, 

175 progress=1.0, 

176 current_step=self._current_step, 

177 message=output, 

178 iteration=self._iteration, 

179 )) 

180 

181 async def fail(self, error: str) -> None: 

182 """子Agent报告失败。""" 

183 if self._heartbeat_cb: 

184 await self._heartbeat_cb(ChildHeartbeat( 

185 agent_id=self.agent_id, 

186 status=ChildStatus.FAILED, 

187 progress=self._progress, 

188 current_step=self._current_step, 

189 message=error, 

190 iteration=self._iteration, 

191 )) 

192 

193 

194class ChildHandle: 

195 """父Agent视角 — 管控一个子Agent。""" 

196 

197 def __init__( 

198 self, 

199 agent_id: str, 

200 task: str, 

201 mode: str, 

202 timeout: float | None = None, 

203 heartbeat_interval: float = 2.0, 

204 ): 

205 self.info = ChildInfo( 

206 agent_id=agent_id, 

207 task=task, 

208 mode=mode, 

209 heartbeat_interval=heartbeat_interval, 

210 timeout=timeout, 

211 ) 

212 self._cancel_flag = False 

213 self._pause_flag = False 

214 self._resume_event = asyncio.Event() 

215 self._resume_event.set() # 默认未暂停 

216 self.shared_state = SharedState() 

217 self.context: ChildContext | None = None 

218 

219 @property 

220 def agent_id(self) -> str: 

221 return self.info.agent_id 

222 

223 @property 

224 def status(self) -> ChildStatus: 

225 return self.info.status 

226 

227 def create_context(self) -> ChildContext: 

228 """为子Agent创建 ChildContext。""" 

229 ctx = ChildContext( 

230 agent_id=self.agent_id, 

231 heartbeat_callback=self._receive_heartbeat, 

232 on_cancel=self._is_cancelled, 

233 on_pause=self._wait_if_paused, 

234 shared_state=self.shared_state, 

235 ) 

236 self.context = ctx 

237 return ctx 

238 

239 async def _receive_heartbeat(self, hb: ChildHeartbeat) -> None: 

240 """接收子Agent心跳。""" 

241 self.info.last_heartbeat = time.time() 

242 self.info.status = hb.status 

243 self.info.progress = hb.progress 

244 self.info.current_step = hb.current_step 

245 self.info.iterations = hb.iteration 

246 if hb.status == ChildStatus.FAILED: 

247 self.info.error = hb.message 

248 elif hb.status == ChildStatus.COMPLETED: 

249 self.info.output = hb.message 

250 

251 def _is_cancelled(self) -> bool: 

252 return self._cancel_flag 

253 

254 async def _wait_if_paused(self) -> None: 

255 await self._resume_event.wait() 

256 

257 async def cancel(self) -> None: 

258 """取消子Agent。""" 

259 self._cancel_flag = True 

260 self.info.status = ChildStatus.CANCELLED 

261 

262 async def pause(self) -> None: 

263 """暂停子Agent。""" 

264 self._pause_flag = True 

265 self._resume_event.clear() 

266 self.info.status = ChildStatus.PAUSED 

267 if self.context: 

268 self.context._paused = True 

269 

270 async def resume(self) -> None: 

271 """恢复子Agent。""" 

272 self._pause_flag = False 

273 self._resume_event.set() 

274 self.info.status = ChildStatus.RUNNING 

275 if self.context: 

276 self.context._paused = False 

277 

278 def check_timeout(self) -> bool: 

279 """检查是否超时,返回 True 表示已超时。""" 

280 if self.info.timeout is None: 

281 return False 

282 elapsed = time.time() - self.info.spawned_at 

283 return elapsed > self.info.timeout 

284 

285 def check_heartbeat_timeout(self) -> bool: 

286 """检查心跳是否超时(3倍心跳间隔无响应视为失联)。""" 

287 elapsed = time.time() - self.info.last_heartbeat 

288 return elapsed > self.info.heartbeat_interval * 3 

289 

290 def get_status(self) -> dict[str, Any]: 

291 """获取子Agent状态摘要。""" 

292 return { 

293 "agent_id": self.info.agent_id, 

294 "status": self.info.status.value, 

295 "progress": self.info.progress, 

296 "current_step": self.info.current_step, 

297 "iterations": self.info.iterations, 

298 "elapsed": time.time() - self.info.spawned_at, 

299 "error": self.info.error, 

300 }