Coverage for agentos/subagent/parent_child.py: 99%
163 statements
« prev ^ index » next coverage.py v7.14.3, created at 2026-07-03 13:55 +0800
« prev ^ index » next coverage.py v7.14.3, created at 2026-07-03 13:55 +0800
1"""
2子Agent父子通信 — 状态共享、心跳、生命周期管理。
3父Agent通过 ChildHandle 管控子Agent;子Agent通过 ChildContext 向父Agent报告。
4"""
6from __future__ import annotations
8import asyncio
9import time
10import uuid
11from dataclasses import dataclass, field
12from enum import Enum
13from typing import Any, Callable, Awaitable
16class ChildStatus(str, Enum):
17 """子Agent运行状态。"""
18 IDLE = "idle"
19 RUNNING = "running"
20 PAUSED = "paused"
21 COMPLETED = "completed"
22 FAILED = "failed"
23 CANCELLED = "cancelled"
24 TIMEOUT = "timeout"
27@dataclass
28class ChildHeartbeat:
29 """子Agent心跳包。"""
30 agent_id: str
31 status: ChildStatus = ChildStatus.RUNNING
32 progress: float = 0.0 # 0.0 ~ 1.0
33 current_step: str = ""
34 message: str = ""
35 iteration: int = 0
36 timestamp: float = field(default_factory=time.time)
39@dataclass
40class ChildInfo:
41 """子Agent元信息(父Agent侧)。"""
42 agent_id: str
43 task: str
44 mode: str
45 status: ChildStatus = ChildStatus.IDLE
46 spawned_at: float = field(default_factory=time.time)
47 last_heartbeat: float = field(default_factory=time.time)
48 heartbeat_interval: float = 2.0 # 期望心跳间隔(秒)
49 timeout: float | None = None # 超时(秒),None=无超时
50 progress: float = 0.0
51 current_step: str = ""
52 iterations: int = 0
53 error: str | None = None
54 output: str = ""
57class SharedState:
58 """父子共享状态(线程安全)。"""
60 def __init__(self):
61 self._lock = asyncio.Lock()
62 self._data: dict[str, Any] = {}
64 async def set(self, key: str, value: Any) -> None:
65 async with self._lock:
66 self._data[key] = value
68 async def get(self, key: str, default: Any = None) -> Any:
69 async with self._lock:
70 return self._data.get(key, default)
72 async def update(self, mapping: dict[str, Any]) -> None:
73 async with self._lock:
74 self._data.update(mapping)
76 async def snapshot(self) -> dict[str, Any]:
77 async with self._lock:
78 return dict(self._data)
80 def set_sync(self, key: str, value: Any) -> None:
81 """同步写(非协程场景)。"""
82 self._data[key] = value
84 def get_sync(self, key: str, default: Any = None) -> Any:
85 """同步读(非协程场景)。"""
86 return self._data.get(key, default)
89class ChildContext:
90 """子Agent视角 — 向父Agent报告状态、检查控制信号。"""
92 def __init__(
93 self,
94 agent_id: str,
95 heartbeat_callback: Callable[[ChildHeartbeat], Awaitable[None]] | None = None,
96 on_cancel: Callable[[], bool] | None = None,
97 on_pause: Callable[[], Awaitable[None]] | None = None,
98 shared_state: SharedState | None = None,
99 ):
100 self.agent_id = agent_id
101 self._heartbeat_cb = heartbeat_callback
102 self._cancel_check = on_cancel or (lambda: False)
103 self._pause_cb = on_pause or (lambda: asyncio.sleep(0))
104 self.shared_state = shared_state or SharedState()
105 self._cancelled = False
106 self._paused = False
107 self._progress = 0.0
108 self._current_step = ""
109 self._iteration = 0
111 @property
112 def cancelled(self) -> bool:
113 return self._cancelled
115 @property
116 def paused(self) -> bool:
117 return self._paused
119 @property
120 def progress(self) -> float:
121 return self._progress
123 async def report_progress(
124 self,
125 progress: float,
126 step: str = "",
127 message: str = "",
128 ) -> None:
129 """子Agent报告进度。"""
130 self._progress = max(0.0, min(1.0, progress))
131 self._current_step = step
132 if self._heartbeat_cb:
133 await self._heartbeat_cb(ChildHeartbeat(
134 agent_id=self.agent_id,
135 status=ChildStatus.RUNNING,
136 progress=self._progress,
137 current_step=step,
138 message=message,
139 iteration=self._iteration,
140 ))
142 async def step(self, iteration: int, step: str = "") -> None:
143 """子Agent标记一个执行步。"""
144 self._iteration = iteration
145 self._current_step = step
147 async def check_control(self) -> ChildStatus:
148 """检查父Agent控制信号,返回应执行的操作。"""
149 if self._cancel_check():
150 self._cancelled = True
151 return ChildStatus.CANCELLED
152 if self._paused:
153 await self._pause_cb()
154 return ChildStatus.PAUSED
155 return ChildStatus.RUNNING
157 async def send_heartbeat(self, message: str = "") -> None:
158 """子Agent发送心跳。"""
159 if self._heartbeat_cb:
160 await self._heartbeat_cb(ChildHeartbeat(
161 agent_id=self.agent_id,
162 status=ChildStatus.RUNNING,
163 progress=self._progress,
164 current_step=self._current_step,
165 message=message,
166 iteration=self._iteration,
167 ))
169 async def done(self, output: str = "") -> None:
170 """子Agent标记完成。"""
171 if self._heartbeat_cb:
172 await self._heartbeat_cb(ChildHeartbeat(
173 agent_id=self.agent_id,
174 status=ChildStatus.COMPLETED,
175 progress=1.0,
176 current_step=self._current_step,
177 message=output,
178 iteration=self._iteration,
179 ))
181 async def fail(self, error: str) -> None:
182 """子Agent报告失败。"""
183 if self._heartbeat_cb:
184 await self._heartbeat_cb(ChildHeartbeat(
185 agent_id=self.agent_id,
186 status=ChildStatus.FAILED,
187 progress=self._progress,
188 current_step=self._current_step,
189 message=error,
190 iteration=self._iteration,
191 ))
194class ChildHandle:
195 """父Agent视角 — 管控一个子Agent。"""
197 def __init__(
198 self,
199 agent_id: str,
200 task: str,
201 mode: str,
202 timeout: float | None = None,
203 heartbeat_interval: float = 2.0,
204 ):
205 self.info = ChildInfo(
206 agent_id=agent_id,
207 task=task,
208 mode=mode,
209 heartbeat_interval=heartbeat_interval,
210 timeout=timeout,
211 )
212 self._cancel_flag = False
213 self._pause_flag = False
214 self._resume_event = asyncio.Event()
215 self._resume_event.set() # 默认未暂停
216 self.shared_state = SharedState()
217 self.context: ChildContext | None = None
219 @property
220 def agent_id(self) -> str:
221 return self.info.agent_id
223 @property
224 def status(self) -> ChildStatus:
225 return self.info.status
227 def create_context(self) -> ChildContext:
228 """为子Agent创建 ChildContext。"""
229 ctx = ChildContext(
230 agent_id=self.agent_id,
231 heartbeat_callback=self._receive_heartbeat,
232 on_cancel=self._is_cancelled,
233 on_pause=self._wait_if_paused,
234 shared_state=self.shared_state,
235 )
236 self.context = ctx
237 return ctx
239 async def _receive_heartbeat(self, hb: ChildHeartbeat) -> None:
240 """接收子Agent心跳。"""
241 self.info.last_heartbeat = time.time()
242 self.info.status = hb.status
243 self.info.progress = hb.progress
244 self.info.current_step = hb.current_step
245 self.info.iterations = hb.iteration
246 if hb.status == ChildStatus.FAILED:
247 self.info.error = hb.message
248 elif hb.status == ChildStatus.COMPLETED:
249 self.info.output = hb.message
251 def _is_cancelled(self) -> bool:
252 return self._cancel_flag
254 async def _wait_if_paused(self) -> None:
255 await self._resume_event.wait()
257 async def cancel(self) -> None:
258 """取消子Agent。"""
259 self._cancel_flag = True
260 self.info.status = ChildStatus.CANCELLED
262 async def pause(self) -> None:
263 """暂停子Agent。"""
264 self._pause_flag = True
265 self._resume_event.clear()
266 self.info.status = ChildStatus.PAUSED
267 if self.context:
268 self.context._paused = True
270 async def resume(self) -> None:
271 """恢复子Agent。"""
272 self._pause_flag = False
273 self._resume_event.set()
274 self.info.status = ChildStatus.RUNNING
275 if self.context:
276 self.context._paused = False
278 def check_timeout(self) -> bool:
279 """检查是否超时,返回 True 表示已超时。"""
280 if self.info.timeout is None:
281 return False
282 elapsed = time.time() - self.info.spawned_at
283 return elapsed > self.info.timeout
285 def check_heartbeat_timeout(self) -> bool:
286 """检查心跳是否超时(3倍心跳间隔无响应视为失联)。"""
287 elapsed = time.time() - self.info.last_heartbeat
288 return elapsed > self.info.heartbeat_interval * 3
290 def get_status(self) -> dict[str, Any]:
291 """获取子Agent状态摘要。"""
292 return {
293 "agent_id": self.info.agent_id,
294 "status": self.info.status.value,
295 "progress": self.info.progress,
296 "current_step": self.info.current_step,
297 "iterations": self.info.iterations,
298 "elapsed": time.time() - self.info.spawned_at,
299 "error": self.info.error,
300 }