Coverage for tools / wait_agents.py: 19%
42 statements
« prev ^ index » next coverage.py v7.13.5, created at 2026-03-29 02:55 +0800
« prev ^ index » next coverage.py v7.13.5, created at 2026-03-29 02:55 +0800
1"""
2wait_agents 工具
4等待所有后台子 agent 完成,收集并返回结果。
5使用 thread.join() 阻塞等待,不轮询,零 CPU 浪费。
6"""
7from pydantic import BaseModel, Field
8from qrclaw.tools.registry import register
9from qrclaw.logger import get_logger
11logger = get_logger("qrclaw.tools.wait_agents")
14class WaitAgentsArgs(BaseModel):
15 agent_ids: list[str] = Field(
16 default_factory=list,
17 description="要等待的子 agent ID 列表,不填则等待所有正在运行的子 agent"
18 )
21@register(
22 description="等待后台子 agent 完成并收集结果。不填 agent_ids 则等待全部。",
23 args_model=WaitAgentsArgs,
24)
25def wait_agents(agent_ids: list[str] = None) -> str:
26 """
27 阻塞等待子 agent 完成,使用 thread.join() 零 CPU 等待。
29 Args:
30 agent_ids: 要等待的子 agent ID 列表,空则等待全部
31 Returns:
32 str: 所有子 agent 的执行结果汇总
33 """
34 from qrclaw.tools.spawn_agent import get_task_pool, get_task_pool_lock
36 task_pool = get_task_pool()
37 lock = get_task_pool_lock()
39 with lock:
40 if not task_pool:
41 return "没有正在运行的子 agent"
43 # 确定要等待的 agent
44 if agent_ids:
45 targets = {aid: task_pool[aid] for aid in agent_ids if aid in task_pool}
46 not_found = [aid for aid in agent_ids if aid not in task_pool]
47 if not_found:
48 logger.warning(f"未找到子 agent: {not_found}")
49 else:
50 targets = dict(task_pool)
52 # 只等 running 状态的
53 to_wait = {aid: t for aid, t in targets.items() if t["status"] == "running"}
54 already_done = {aid: t for aid, t in targets.items() if t["status"] != "running"}
56 logger.info(f"等待子 agent 完成: {list(to_wait.keys())}")
58 # thread.join() 阻塞等待,线程跑完自动唤醒,零 CPU 消耗
59 for agent_id, task in to_wait.items():
60 logger.info(f"等待子 agent: {agent_id}")
61 task["thread"].join()
62 logger.info(f"子 agent {agent_id} 已完成")
64 # 汇总所有结果
65 lines = ["## 子 agent 执行结果汇总", ""]
67 with lock:
68 all_targets = {**already_done, **{aid: task_pool[aid] for aid in to_wait}}
70 for agent_id, task in all_targets.items():
71 status_label = "✅ 完成" if task["status"] == "done" else "❌ 出错"
72 lines.append(f"### [{status_label}] {agent_id}")
73 lines.append("")
74 lines.append(task["result"] or "无结果")
75 lines.append("")
77 # 清空已完成的任务
78 with lock:
79 for agent_id in all_targets:
80 if task_pool.get(agent_id, {}).get("status") != "running":
81 task_pool.pop(agent_id, None)
83 logger.info("所有子 agent 结果已收集,任务池已清理")
84 return "\n".join(lines)