Coverage for tools / planning.py: 49%
35 statements
« prev ^ index » next coverage.py v7.13.5, created at 2026-03-29 02:55 +0800
« prev ^ index » next coverage.py v7.13.5, created at 2026-03-29 02:55 +0800
1from pydantic import BaseModel, Field
2from qrclaw.tools.registry import register
3from qrclaw.logger import get_logger
5logger = get_logger("qrclaw.tools.planning")
7class PlanStep(BaseModel):
8 id: int = Field(description="步骤编号,从 1 开始")
9 description: str = Field(description="这一步要做什么")
10 depends_on: list[int] = Field(default=[], description="依赖的前置步骤编号列表,没有依赖则为空")
12class CreatePlanArgs(BaseModel):
13 goal: str = Field(description="任务目标,简要描述要完成什么")
14 steps: list[PlanStep] = Field(description="执行步骤列表,按顺序排列")
16class CompleteStepArgs(BaseModel):
17 step_id: int = Field(description="已完成的步骤编号")
19@register(description="为复杂任务创建执行计划,拆解成有序步骤后逐步执行", args_model=CreatePlanArgs)
20def create_plan(goal: str, steps: list[dict]) -> str:
21 from qrclaw.agent import get_session
22 logger.info(f"创建执行计划: {goal}, 共 {len(steps)} 步")
23 session = get_session()
24 if session:
25 session.set_plan(goal, steps)
26 return f"计划已创建,目标:{goal},共 {len(steps)} 步。计划已注入上下文,请从 Step 1 开始执行,每完成一步调用 complete_step 标记完成后再继续下一步。"
28@register(description="标记某个计划步骤为已完成,完成后继续执行下一步", args_model=CompleteStepArgs)
29def complete_step(step_id: int) -> str:
30 from qrclaw.agent import get_session
31 session = get_session()
32 if not session or not session.active_plan:
33 return "当前没有活跃的执行计划"
34 all_done = session.complete_step(step_id)
35 if all_done:
36 logger.info(f"步骤 {step_id} 完成,所有步骤已全部完成")
37 return f"Step {step_id} 已完成。所有步骤全部完成,计划结束。"
38 # 找下一个未完成的步骤
39 remaining = [s for s in session.active_plan["steps"] if not s["done"]]
40 next_step = remaining[0]
41 logger.info(f"步骤 {step_id} 完成,下一步: Step {next_step['id']}")
42 return f"Step {step_id} 已完成。请继续执行 Step {next_step['id']}: {next_step['description']}"