Coverage for tools / planning.py: 49%

35 statements  

« prev     ^ index     » next       coverage.py v7.13.5, created at 2026-03-29 02:55 +0800

1from pydantic import BaseModel, Field 

2from qrclaw.tools.registry import register 

3from qrclaw.logger import get_logger 

4 

5logger = get_logger("qrclaw.tools.planning") 

6 

7class PlanStep(BaseModel): 

8 id: int = Field(description="步骤编号,从 1 开始") 

9 description: str = Field(description="这一步要做什么") 

10 depends_on: list[int] = Field(default=[], description="依赖的前置步骤编号列表,没有依赖则为空") 

11 

12class CreatePlanArgs(BaseModel): 

13 goal: str = Field(description="任务目标,简要描述要完成什么") 

14 steps: list[PlanStep] = Field(description="执行步骤列表,按顺序排列") 

15 

16class CompleteStepArgs(BaseModel): 

17 step_id: int = Field(description="已完成的步骤编号") 

18 

19@register(description="为复杂任务创建执行计划,拆解成有序步骤后逐步执行", args_model=CreatePlanArgs) 

20def create_plan(goal: str, steps: list[dict]) -> str: 

21 from qrclaw.agent import get_session 

22 logger.info(f"创建执行计划: {goal}, 共 {len(steps)}") 

23 session = get_session() 

24 if session: 

25 session.set_plan(goal, steps) 

26 return f"计划已创建,目标:{goal},共 {len(steps)} 步。计划已注入上下文,请从 Step 1 开始执行,每完成一步调用 complete_step 标记完成后再继续下一步。" 

27 

28@register(description="标记某个计划步骤为已完成,完成后继续执行下一步", args_model=CompleteStepArgs) 

29def complete_step(step_id: int) -> str: 

30 from qrclaw.agent import get_session 

31 session = get_session() 

32 if not session or not session.active_plan: 

33 return "当前没有活跃的执行计划" 

34 all_done = session.complete_step(step_id) 

35 if all_done: 

36 logger.info(f"步骤 {step_id} 完成,所有步骤已全部完成") 

37 return f"Step {step_id} 已完成。所有步骤全部完成,计划结束。" 

38 # 找下一个未完成的步骤 

39 remaining = [s for s in session.active_plan["steps"] if not s["done"]] 

40 next_step = remaining[0] 

41 logger.info(f"步骤 {step_id} 完成,下一步: Step {next_step['id']}") 

42 return f"Step {step_id} 已完成。请继续执行 Step {next_step['id']}: {next_step['description']}"