"""ReWOO: Reasoning WithOut Observation — plan-then-tool-execute.
Xu et al. 2023 — `ReWOO: Decoupling Reasoning from Observations
for Efficient Augmented Language Models
<https://arxiv.org/abs/2305.18323>`_. The cost-saving sibling of
:class:`~jeevesagent.architecture.PlanAndExecute`: each step in the
plan is a real **tool call**, with ``{{En}}`` placeholder
substitution to reference prior step outputs. Independent steps
(no dependency on each other) run in **parallel**.
Total cost: **2 LLM calls + N tool calls**. ReAct on the same task
needs roughly N+1 LLM calls (one per turn). For tool-heavy workloads
where the planner can predict the call sequence upfront, ReWOO is
30-50% cheaper.
Pattern
-------
1. **Planner.** ONE LLM call. Output is a JSON list of steps. Each
step has shape::
{"id": "E1", "tool": "<tool_name>", "args": {...}}
Args may reference prior steps via ``{{En}}`` placeholders —
``{"args": {"url": "{{E1}}"}}`` will use E1's output as the
``url`` arg when E2 runs.
2. **Worker.** Compute topological levels from the plan's
placeholder dependencies. For each level, dispatch all steps
in parallel via ``deps.tools.call(...)``. Substitute
``{{En}}`` placeholders in args from prior step outputs first.
3. **Solver.** ONE LLM call. Given the original task and the
step→output map, produce the final answer.
Strengths
---------
* **Cheaper than ReAct on tool-heavy multi-step tasks.** Two LLM
calls cap the LLM cost regardless of plan length.
* **Parallelism for free.** Independent steps run concurrently via
``anyio.create_task_group`` (same primitive Supervisor + ReAct
use for parallel tool dispatch).
* **Observable plan.** The plan is a structured Pydantic object —
log it, audit it, override it before execution.
Weaknesses
---------
* **Planner must predict accurately upfront.** No replanning on
failure in v1. If a step fails, the worker logs the error and
the solver sees it as the step's "output."
* **Limited to known tool names.** A planner that hallucinates a
tool name produces a step that errors at dispatch time.
* **Placeholder substitution is string-typed.** Tool outputs get
stringified. For structured-output tools, the planner has to
treat outputs as opaque text.
"""
from __future__ import annotations
import json
import re
from collections.abc import AsyncIterator
from typing import TYPE_CHECKING, Any
import anyio
from pydantic import BaseModel, Field
from ..core.types import Event, Message, Role, ToolResult
from .base import AgentSession, Dependencies
from .helpers import add_usage, text_only_model_call
if TYPE_CHECKING:
from ..agent.api import Agent
DEFAULT_PLANNER_PROMPT = """\
You produce a step-by-step plan to solve the user's task using the
available tools. Each step is a tool call.
Output ONLY a JSON array of step objects. Each step has:
- "id": a short identifier like "E1", "E2", ... unique within the plan
- "tool": the name of one of the available tools
- "args": a dict of arguments to pass to the tool
You may reference a prior step's output in args using `{{En}}`. The
worker will substitute `{{En}}` with the actual output of step En
before invoking the tool.
Available tools:
{tool_descriptions}
Output format example:
[
{{"id": "E1", "tool": "web_search", "args": {{"query": "Tokyo weather"}}}},
{{"id": "E2", "tool": "summarize", "args": {{"text": "{{{{E1}}}}"}}}}
]
Output ONLY the JSON array. No prose, no markdown fences.
"""
DEFAULT_SOLVER_PROMPT = """\
You synthesize the final answer from a sequence of tool-call
results. Use the original task and the step outputs to produce the
final answer. Be concise."""
[docs]
class ReWOOStep(BaseModel):
"""One step of a ReWOO plan: id + tool + args."""
id: str
tool: str
args: dict[str, Any] = Field(default_factory=dict)
@property
def depends_on(self) -> list[str]:
"""Extract ``{{En}}`` step ids referenced in args."""
return _extract_placeholders(self.args)
[docs]
class ReWOOPlan(BaseModel):
"""A list of ReWOO steps (no required ordering — dependencies
are inferred from ``{{En}}`` placeholders)."""
steps: list[ReWOOStep] = Field(default_factory=list)
[docs]
class ReWOOStepResult(BaseModel):
step_id: str
tool: str
output: str
error: str | None = None
[docs]
class ReWOO:
"""Plan-then-tool-execute with placeholder substitution."""
name = "rewoo"
def __init__(
self,
*,
max_steps: int = 8,
planner_prompt: str | None = None,
solver_prompt: str | None = None,
parallel_levels: bool = True,
) -> None:
if max_steps < 1:
raise ValueError("max_steps must be >= 1")
self._max_steps = max_steps
self._planner_prompt = (
planner_prompt or DEFAULT_PLANNER_PROMPT
)
self._solver_prompt = (
solver_prompt or DEFAULT_SOLVER_PROMPT
)
self._parallel_levels = parallel_levels
[docs]
def declared_workers(self) -> dict[str, Agent]:
return {}
[docs]
async def run(
self,
session: AgentSession,
deps: Dependencies,
prompt: str,
) -> AsyncIterator[Event]:
# === 1. Planner ===
yield Event.architecture_event(
session.id, "rewoo.planner_started"
)
plan = await self._make_plan(deps, session, prompt)
if len(plan.steps) > self._max_steps:
plan = ReWOOPlan(steps=plan.steps[: self._max_steps])
yield Event.architecture_event(
session.id,
"rewoo.plan_created",
num_steps=len(plan.steps),
steps=[
{"id": s.id, "tool": s.tool, "depends_on": s.depends_on}
for s in plan.steps
],
)
if not plan.steps:
session.output = (
"Planner produced no steps; cannot execute."
)
yield Event.architecture_event(
session.id, "rewoo.empty_plan"
)
return
# === 2. Topological worker ===
levels = _topological_levels(plan.steps)
if levels is None:
# Cycle detected — planner-side bug.
session.output = (
"Planner produced a plan with cyclic dependencies; "
"cannot execute."
)
yield Event.architecture_event(
session.id, "rewoo.cyclic_plan"
)
return
results: dict[str, ReWOOStepResult] = {}
for level_index, level in enumerate(levels):
status = await deps.budget.allows_step()
if status.blocked:
session.interrupted = True
session.interruption_reason = (
f"budget:{status.reason}"
)
yield Event.budget_exceeded(session.id, status)
return
if status.warn:
yield Event.budget_warning(session.id, status)
yield Event.architecture_event(
session.id,
"rewoo.level_started",
level=level_index,
step_ids=[s.id for s in level],
)
level_results = await _execute_level(
deps, session, level, results, self._parallel_levels
)
results.update(level_results)
for step_id, sr in level_results.items():
yield Event.architecture_event(
session.id,
"rewoo.step_completed",
step_id=step_id,
tool=sr.tool,
error=sr.error,
output=sr.output[:300],
)
# === 3. Solver ===
yield Event.architecture_event(
session.id, "rewoo.solver_started"
)
final = await self._solve(
deps, session, prompt, plan, results
)
session.output = final
session.metadata["rewoo_plan"] = plan.model_dump()
session.metadata["rewoo_results"] = {
sid: r.model_dump() for sid, r in results.items()
}
yield Event.architecture_event(
session.id,
"rewoo.completed",
num_steps=len(plan.steps),
final=final[:300],
)
# ---- helpers -----------------------------------------------------
async def _make_plan(
self,
deps: Dependencies,
session: AgentSession,
prompt: str,
) -> ReWOOPlan:
tool_defs = await deps.tools.list_tools()
tool_descriptions = (
"\n".join(
f" - {t.name}: {t.description}" for t in tool_defs
)
or " (no tools registered)"
)
planner_text = self._planner_prompt.format(
tool_descriptions=tool_descriptions
)
msgs = [
Message(role=Role.SYSTEM, content=planner_text),
Message(role=Role.USER, content=prompt),
]
text, usage = await text_only_model_call(
deps, "rewoo_planner", msgs
)
await deps.budget.consume(
tokens_in=usage.input_tokens,
tokens_out=usage.output_tokens,
cost_usd=usage.cost_usd,
)
session.cumulative_usage = add_usage(
session.cumulative_usage, usage
)
session.turns += 1
return _parse_rewoo_plan(text)
async def _solve(
self,
deps: Dependencies,
session: AgentSession,
prompt: str,
plan: ReWOOPlan,
results: dict[str, ReWOOStepResult],
) -> str:
results_text = "\n\n".join(
f"Step {sid} ({results[sid].tool}):\n"
+ (
f"ERROR: {results[sid].error}"
if results[sid].error
else results[sid].output
)
for sid in [s.id for s in plan.steps]
if sid in results
)
user_content = (
f"Original task:\n{prompt}\n\n"
f"Step outputs:\n{results_text}\n\n"
f"Produce the final answer."
)
msgs = [
Message(role=Role.SYSTEM, content=self._solver_prompt),
Message(role=Role.USER, content=user_content),
]
text, usage = await text_only_model_call(
deps, "rewoo_solver", msgs
)
await deps.budget.consume(
tokens_in=usage.input_tokens,
tokens_out=usage.output_tokens,
cost_usd=usage.cost_usd,
)
session.cumulative_usage = add_usage(
session.cumulative_usage, usage
)
session.turns += 1
return text.strip()
# ---------------------------------------------------------------------------
# Placeholder + topological helpers
# ---------------------------------------------------------------------------
_PLACEHOLDER_RE = re.compile(r"\{\{(E\d+)\}\}")
def _extract_placeholders(value: Any) -> list[str]:
"""Recursively walk ``value`` (a dict / list / str) and collect
every ``{{En}}`` placeholder id. Used to compute step
dependencies from ``ReWOOStep.args``."""
found: set[str] = set()
def _walk(v: Any) -> None:
if isinstance(v, str):
for m in _PLACEHOLDER_RE.finditer(v):
found.add(m.group(1))
elif isinstance(v, dict):
for sub in v.values():
_walk(sub)
elif isinstance(v, list):
for sub in v:
_walk(sub)
_walk(value)
return sorted(found)
def _substitute_placeholders(
value: Any, results: dict[str, ReWOOStepResult]
) -> Any:
"""Recursively replace ``{{En}}`` in ``value`` with the
corresponding step's output text. Strings get substring
substitution; non-string types are returned unchanged."""
if isinstance(value, str):
def _replace(match: re.Match[str]) -> str:
sid = match.group(1)
if sid in results:
return results[sid].output
return match.group(0) # leave unresolved as-is
return _PLACEHOLDER_RE.sub(_replace, value)
if isinstance(value, dict):
return {
k: _substitute_placeholders(v, results)
for k, v in value.items()
}
if isinstance(value, list):
return [
_substitute_placeholders(v, results) for v in value
]
return value
def _topological_levels(
steps: list[ReWOOStep],
) -> list[list[ReWOOStep]] | None:
"""Group steps by topological level — steps in level N depend
only on steps in levels 0..N-1.
Returns ``None`` if the dependency graph has a cycle (planner
bug). Steps that reference unknown ids are treated as having
no dependency on those — they run as soon as their KNOWN
dependencies are satisfied (the substitution leaves the
placeholder literal in place at execution time).
"""
by_id = {s.id: s for s in steps}
remaining = {s.id for s in steps}
levels: list[list[ReWOOStep]] = []
placed: set[str] = set()
while remaining:
# Steps whose deps are all placed already (or unknown ids)
# qualify for this level.
current_level = [
by_id[sid]
for sid in remaining
if all(
dep in placed or dep not in by_id
for dep in by_id[sid].depends_on
)
]
if not current_level:
# Nothing made progress → cycle.
return None
# Stable order within a level: by step id (lexicographic).
current_level.sort(key=lambda s: s.id)
levels.append(current_level)
for s in current_level:
placed.add(s.id)
remaining.discard(s.id)
return levels
async def _execute_level(
deps: Dependencies,
session: AgentSession,
level: list[ReWOOStep],
prior_results: dict[str, ReWOOStepResult],
parallel: bool,
) -> dict[str, ReWOOStepResult]:
"""Run every step in ``level`` (parallel or sequential) and
return the new step results keyed by step id."""
new_results: dict[str, ReWOOStepResult] = {}
if parallel and len(level) > 1:
async with anyio.create_task_group() as tg:
for step in level:
tg.start_soon(
_run_one_step,
deps,
session,
step,
prior_results,
new_results,
)
else:
for step in level:
await _run_one_step(
deps, session, step, prior_results, new_results
)
return new_results
async def _run_one_step(
deps: Dependencies,
session: AgentSession,
step: ReWOOStep,
prior_results: dict[str, ReWOOStepResult],
out_results: dict[str, ReWOOStepResult],
) -> None:
"""Resolve placeholders, dispatch the tool call, capture output
or error into ``out_results[step.id]``."""
resolved_args = _substitute_placeholders(
step.args, prior_results
)
try:
tool_result: ToolResult = await deps.runtime.step(
f"rewoo_step_{step.id}",
deps.tools.call,
step.tool,
resolved_args,
call_id=f"rewoo_{step.id}",
)
except Exception as exc: # noqa: BLE001 — surface as step error
out_results[step.id] = ReWOOStepResult(
step_id=step.id,
tool=step.tool,
output="",
error=str(exc),
)
return
if tool_result.ok:
out_results[step.id] = ReWOOStepResult(
step_id=step.id,
tool=step.tool,
output=str(tool_result.output),
)
else:
out_results[step.id] = ReWOOStepResult(
step_id=step.id,
tool=step.tool,
output="",
error=tool_result.error
or tool_result.reason
or "tool failed",
)
# ---------------------------------------------------------------------------
# Plan parsing
# ---------------------------------------------------------------------------
def _parse_rewoo_plan(text: str) -> ReWOOPlan:
"""Parse a ReWOO plan from JSON. Tolerant of markdown code
fences. Returns an empty plan on parse failure (caller's
``empty_plan`` branch handles termination)."""
cleaned = text.strip()
if cleaned.startswith("```"):
lines = cleaned.split("\n")
if lines[0].startswith("```"):
lines = lines[1:]
while lines and lines[-1].strip().startswith("```"):
lines = lines[:-1]
cleaned = "\n".join(lines).strip()
try:
parsed = json.loads(cleaned)
except (json.JSONDecodeError, ValueError):
return ReWOOPlan(steps=[])
if not isinstance(parsed, list):
return ReWOOPlan(steps=[])
steps: list[ReWOOStep] = []
for i, item in enumerate(parsed):
if not isinstance(item, dict):
continue
sid = str(item.get("id") or f"E{i + 1}")
tool = item.get("tool")
if not isinstance(tool, str) or not tool:
continue
args = item.get("args", {})
if not isinstance(args, dict):
args = {}
steps.append(ReWOOStep(id=sid, tool=tool, args=args))
return ReWOOPlan(steps=steps)