Coverage for tools / wait_agents.py: 19%

42 statements  

« prev     ^ index     » next       coverage.py v7.13.5, created at 2026-03-29 02:55 +0800

1""" 

2wait_agents 工具 

3 

4等待所有后台子 agent 完成,收集并返回结果。 

5使用 thread.join() 阻塞等待,不轮询,零 CPU 浪费。 

6""" 

7from pydantic import BaseModel, Field 

8from qrclaw.tools.registry import register 

9from qrclaw.logger import get_logger 

10 

11logger = get_logger("qrclaw.tools.wait_agents") 

12 

13 

14class WaitAgentsArgs(BaseModel): 

15 agent_ids: list[str] = Field( 

16 default_factory=list, 

17 description="要等待的子 agent ID 列表,不填则等待所有正在运行的子 agent" 

18 ) 

19 

20 

21@register( 

22 description="等待后台子 agent 完成并收集结果。不填 agent_ids 则等待全部。", 

23 args_model=WaitAgentsArgs, 

24) 

25def wait_agents(agent_ids: list[str] = None) -> str: 

26 """ 

27 阻塞等待子 agent 完成,使用 thread.join() 零 CPU 等待。 

28 

29 Args: 

30 agent_ids: 要等待的子 agent ID 列表,空则等待全部 

31 Returns: 

32 str: 所有子 agent 的执行结果汇总 

33 """ 

34 from qrclaw.tools.spawn_agent import get_task_pool, get_task_pool_lock 

35 

36 task_pool = get_task_pool() 

37 lock = get_task_pool_lock() 

38 

39 with lock: 

40 if not task_pool: 

41 return "没有正在运行的子 agent" 

42 

43 # 确定要等待的 agent 

44 if agent_ids: 

45 targets = {aid: task_pool[aid] for aid in agent_ids if aid in task_pool} 

46 not_found = [aid for aid in agent_ids if aid not in task_pool] 

47 if not_found: 

48 logger.warning(f"未找到子 agent: {not_found}") 

49 else: 

50 targets = dict(task_pool) 

51 

52 # 只等 running 状态的 

53 to_wait = {aid: t for aid, t in targets.items() if t["status"] == "running"} 

54 already_done = {aid: t for aid, t in targets.items() if t["status"] != "running"} 

55 

56 logger.info(f"等待子 agent 完成: {list(to_wait.keys())}") 

57 

58 # thread.join() 阻塞等待,线程跑完自动唤醒,零 CPU 消耗 

59 for agent_id, task in to_wait.items(): 

60 logger.info(f"等待子 agent: {agent_id}") 

61 task["thread"].join() 

62 logger.info(f"子 agent {agent_id} 已完成") 

63 

64 # 汇总所有结果 

65 lines = ["## 子 agent 执行结果汇总", ""] 

66 

67 with lock: 

68 all_targets = {**already_done, **{aid: task_pool[aid] for aid in to_wait}} 

69 

70 for agent_id, task in all_targets.items(): 

71 status_label = "✅ 完成" if task["status"] == "done" else "❌ 出错" 

72 lines.append(f"### [{status_label}] {agent_id}") 

73 lines.append("") 

74 lines.append(task["result"] or "无结果") 

75 lines.append("") 

76 

77 # 清空已完成的任务 

78 with lock: 

79 for agent_id in all_targets: 

80 if task_pool.get(agent_id, {}).get("status") != "running": 

81 task_pool.pop(agent_id, None) 

82 

83 logger.info("所有子 agent 结果已收集,任务池已清理") 

84 return "\n".join(lines)