Coverage for tools / spawn_agent.py: 42%

66 statements  

« prev     ^ index     » next       coverage.py v7.13.5, created at 2026-03-29 02:55 +0800

1""" 

2spawn_agent 工具 

3 

4允许主 agent 并行创建多个子 agent 执行任务。 

5子 agent 在后台线程运行,立即返回,不阻塞主 agent。 

6通过 wait_agents 工具等待并收集所有结果。 

7 

8重要:子 agent 不允许再派生子 agent,防止无限嵌套。 

9""" 

10import threading 

11from rich.console import Console 

12from rich.panel import Panel 

13from pydantic import BaseModel, Field 

14from qrclaw.tools.registry import register 

15from qrclaw.logger import get_logger 

16 

17logger = get_logger("qrclaw.tools.spawn_agent") 

18 

19# 全局任务池 

20_task_pool: dict = {} 

21_task_pool_lock = threading.Lock() 

22 

23# 全局 console,由 app.py 启动时注入 

24_console: Console = None 

25 

26def set_console(console: Console): 

27 """注入全局 console(由 app.py 调用)""" 

28 global _console 

29 _console = console 

30 

31 

32def get_task_pool() -> dict: 

33 """获取任务池(供 wait_agents 使用)""" 

34 return _task_pool 

35 

36 

37def get_task_pool_lock() -> threading.Lock: 

38 """获取任务池锁""" 

39 return _task_pool_lock 

40 

41 

42# 存储子 agent 到父 agent 的映射(用于权限继承) 

43_parent_agent_map: dict = {} 

44_parent_agent_map_lock = threading.Lock() 

45 

46 

47def get_parent_agent_id(sub_agent_id: str) -> str | None: 

48 """获取子 agent 的父 agent ID""" 

49 return _parent_agent_map.get(sub_agent_id) 

50 

51 

52def set_parent_agent(sub_agent_id: str, parent_agent_id: str): 

53 """设置子 agent 的父 agent ID""" 

54 with _parent_agent_map_lock: 

55 _parent_agent_map[sub_agent_id] = parent_agent_id 

56 

57 

58def clear_parent_agent(sub_agent_id: str): 

59 """清除子 agent 的父 agent 映射""" 

60 with _parent_agent_map_lock: 

61 _parent_agent_map.pop(sub_agent_id, None) 

62 

63 

64class SpawnAgentArgs(BaseModel): 

65 agent_id: str = Field(description="子 agent 的 ID,例如 'coder'、'reviewer'") 

66 task: str = Field(description="交给子 agent 的任务描述,要清晰具体") 

67 

68 

69@register( 

70 description="在后台启动子 agent 并行执行任务,立即返回不阻塞。任务可拆分时批量调用此工具启动多个子 agent,再用 wait_agents 统一等待结果。子 agent 完成时结果自动打印。", 

71 args_model=SpawnAgentArgs, 

72) 

73def spawn_agent(agent_id: str, task: str) -> str: 

74 """ 

75 在后台线程启动子 agent,立即返回。 

76 

77 Args: 

78 agent_id: 子 agent ID 

79 task: 子 agent 要执行的任务 

80 Returns: 

81 str: 启动确认信息 

82 """ 

83 from qrclaw.agent import get_workspace, run_sub_agent, is_sub_agent 

84 from qrclaw.workspace import Workspace 

85 

86 # 关键检查:子 agent 不允许再派生子 agent,防止无限嵌套 

87 if is_sub_agent(): 

88 return "错误:子 agent 不允许再派生子 agent,这会导致无限嵌套。请直接执行任务,不要使用 spawn_agent。" 

89 

90 # 如果该 agent_id 已在运行,拒绝重复启动 

91 with _task_pool_lock: 

92 if agent_id in _task_pool and _task_pool[agent_id]["status"] == "running": 

93 return f"子 agent '{agent_id}' 已在运行中,请等待完成或使用不同的 agent_id" 

94 

95 # 直接取当前 workspace,不再靠 session 路径反推 

96 # 这样无论当前是顶层 agent 还是子 agent,新建的子 agent 都是同级的 

97 main_workspace = get_workspace() or Workspace("default") 

98 sub_workspace = main_workspace.sub_agent(agent_id) 

99 

100 # 记录子 agent 的父 agent ID(用于权限继承) 

101 set_parent_agent(agent_id, main_workspace.agent_id) 

102 

103 logger.info(f"启动子 agent: {agent_id}, 工作空间: {sub_workspace.root}, 父 agent: {main_workspace.agent_id}") 

104 

105 def _run(): 

106 try: 

107 result = run_sub_agent(task, sub_workspace) 

108 with _task_pool_lock: 

109 _task_pool[agent_id]["status"] = "done" 

110 _task_pool[agent_id]["result"] = result 

111 logger.info(f"子 agent {agent_id} 完成") 

112 # 完成后直接打印到控制台,不需要主 agent 主动等待 

113 if _console: 

114 _console.print() 

115 _console.print(Panel( 

116 result, 

117 title=f"[bold green]子 agent '{agent_id}' 完成[/bold green]", 

118 border_style="green", 

119 expand=False, 

120 )) 

121 _console.print() 

122 except Exception as e: 

123 logger.error(f"子 agent {agent_id} 出错: {e}", exc_info=True) 

124 with _task_pool_lock: 

125 _task_pool[agent_id]["status"] = "error" 

126 _task_pool[agent_id]["result"] = f"执行出错: {e}" 

127 if _console: 

128 _console.print(f"\n[bold red]子 agent '{agent_id}' 执行出错: {e}[/bold red]\n") 

129 finally: 

130 # 清理父 agent 映射 

131 clear_parent_agent(agent_id) 

132 

133 thread = threading.Thread(target=_run, name=f"sub-agent-{agent_id}", daemon=True) 

134 

135 with _task_pool_lock: 

136 _task_pool[agent_id] = { 

137 "status": "running", 

138 "result": None, 

139 "thread": thread, 

140 } 

141 

142 thread.start() 

143 return f"子 agent '{agent_id}' 已在后台启动,任务:{task[:50]}{'...' if len(task) > 50 else ''}"