"""ActorCritic: generator + adversarial critic, asymmetric by design.
Sutton & Barto 1998 (RL foundations); LLM-era papers Madaan et al.
2023 (Self-Refine — same model both roles), Gou et al. 2023 (CRITIC),
Sun et al. 2025 (CGI — separate critic model). 2026 production
literature recommends ActorCritic for **quality-critical work** —
code generation, security review, important written communications.
The pattern in one line: actor proposes; critic finds problems with a
*different prompt and ideally a different model*; actor revises.
Why a separate :class:`SelfRefine`?
-----------------------------------
:class:`SelfRefine` runs critic + refiner with the parent's same
model and same prompt template. ActorCritic earns its complexity
only when the actor and critic have *different blind spots* —
typically different models. We require both ``actor`` and
``critic`` :class:`Agent` instances; for same-model self-critique,
use :class:`SelfRefine`.
Pattern
-------
1. **Round 0 (actor).** ``actor.run(prompt)`` produces an initial
output.
2. **For each round up to ``max_rounds``:**
a. **Critic.** ``critic.run(critique_prompt)`` produces a
structured critique with an explicit ``score`` 0-1.
b. **Approval check.** If ``critique.score >=
approval_threshold``, terminate as approved.
c. **Refine.** ``actor.run(refine_prompt)`` produces a revised
output that addresses the critique. The new output replaces
the old.
3. **Max rounds reached without approval.** Return the current
output. Best we have.
Replay correctness
------------------
Each actor / critic invocation uses a deterministic session id
(``{parent}__actor_<round>`` / ``{parent}__critic_<round>``) so
replays of the parent reproduce the same sub-sessions.
Tuning
------
* ``max_rounds=3`` is the production sweet spot for code generation.
* ``approval_threshold=0.9`` is strict; lower to 0.85 for friendlier
convergence.
* Use **different models** for actor and critic. Claude Opus actor +
GPT-4o critic (or vice versa) is the canonical asymmetry.
Composition
-----------
* Inside :class:`Supervisor`: each worker can be an ActorCritic for
per-domain quality control (``coder`` worker uses ActorCritic for
code review).
* Inside :class:`Reflexion`: cross-session learning of which
critique patterns produce real improvements.
"""
from __future__ import annotations
import json
import re
from collections.abc import AsyncIterator
from typing import TYPE_CHECKING
from pydantic import BaseModel, Field
from ..core.types import Event
from .base import AgentSession, Dependencies
from .helpers import SubagentInvocation
if TYPE_CHECKING:
from ..agent.api import Agent
DEFAULT_CRITIQUE_TEMPLATE = """\
You are reviewing the output below against the original task. Find
every issue you can: factual errors, missing requirements, edge
cases, security holes, unclear language. Be specific — cite the
section of the output you're criticizing.
Output ONLY a JSON object with this shape:
{{"issues": ["...", "..."], "score": 0.0-1.0, "summary": "..."}}
The score is your confidence the output fully solves the task:
- 1.0 = no issues, ship it
- 0.7-0.9 = mostly correct, minor gaps
- 0.4-0.6 = real problems, must revise
- 0.0-0.3 = wrong or missing core deliverable
Original task:
{prompt}
Output to review:
{output}
"""
DEFAULT_REFINE_TEMPLATE = """\
Revise your previous output based on the critique below. Address
every point in the critique. Output ONLY the revised version, no
preamble or commentary about what changed.
Original task:
{prompt}
Previous output:
{output}
Critique:
{issues_bulleted}
"""
[docs]
class CriticOutput(BaseModel):
"""Structured critic verdict.
Parsed from the critic Agent's output. Falls back to a single-
issue blob with score 0.0 when JSON parsing fails so the loop
keeps making progress instead of crashing on a malformed reply.
"""
issues: list[str] = Field(default_factory=list)
score: float = Field(ge=0.0, le=1.0, default=0.0)
summary: str = ""
[docs]
class ActorCritic:
"""Actor + adversarial critic with optional different models.
Constructor parameters:
* ``actor`` (required): the generating :class:`Agent`. Sees the
original prompt on round 0 and a refine prompt on subsequent
rounds.
* ``critic`` (required): the reviewing :class:`Agent`. Sees the
original prompt + the actor's current output and produces
structured JSON critique.
* ``max_rounds``: cap on critique-refine cycles after the
initial generation. Default 3.
* ``approval_threshold``: terminate when ``critique.score`` is
at or above this value. Default 0.9.
* ``critique_template`` / ``refine_template``: override the
default prompts. Templates use ``{prompt}``, ``{output}``,
``{critique}``, ``{issues_bulleted}``.
"""
name = "actor-critic"
def __init__(
self,
*,
actor: Agent,
critic: Agent,
max_rounds: int = 3,
approval_threshold: float = 0.9,
critique_template: str | None = None,
refine_template: str | None = None,
) -> None:
if max_rounds < 1:
raise ValueError("max_rounds must be >= 1")
if not 0.0 <= approval_threshold <= 1.0:
raise ValueError(
"approval_threshold must be in [0.0, 1.0]"
)
self._actor = actor
self._critic = critic
self._max_rounds = max_rounds
self._threshold = approval_threshold
self._critique_template = (
critique_template or DEFAULT_CRITIQUE_TEMPLATE
)
self._refine_template = (
refine_template or DEFAULT_REFINE_TEMPLATE
)
[docs]
def declared_workers(self) -> dict[str, Agent]:
return {"actor": self._actor, "critic": self._critic}
[docs]
async def run(
self,
session: AgentSession,
deps: Dependencies,
prompt: str,
) -> AsyncIterator[Event]:
# === Round 0: initial generation by actor ===
yield Event.architecture_event(
session.id,
"actor_critic.actor_started",
round=0,
phase="generate",
)
actor_inv = SubagentInvocation(
self._actor, prompt, session_id=f"{session.id}__actor_0"
)
async for ev in actor_inv.events():
yield ev
actor_result = actor_inv.result
current_output = str(actor_result.get("output", ""))
session.output = current_output
session.turns += int(actor_result.get("turns", 0) or 0)
if bool(actor_result.get("interrupted", False)):
session.interrupted = True
session.interruption_reason = (
f"actor:round_0:"
f"{actor_result.get('interruption_reason') or 'unknown'}"
)
return
yield Event.architecture_event(
session.id,
"actor_critic.actor_completed",
round=0,
phase="generate",
)
# === Critique → refine loop ===
for round_num in range(1, self._max_rounds + 1):
status = await deps.budget.allows_step()
if status.blocked:
session.interrupted = True
session.interruption_reason = (
f"budget:{status.reason}"
)
yield Event.budget_exceeded(session.id, status)
return
if status.warn:
yield Event.budget_warning(session.id, status)
# --- Critic ---
yield Event.architecture_event(
session.id,
"actor_critic.critic_started",
round=round_num,
)
critique_prompt = self._critique_template.format(
prompt=prompt,
output=current_output,
)
critic_inv = SubagentInvocation(
self._critic,
critique_prompt,
session_id=f"{session.id}__critic_{round_num}",
)
async for ev in critic_inv.events():
yield ev
critic_result = critic_inv.result
session.turns += int(critic_result.get("turns", 0) or 0)
if bool(critic_result.get("interrupted", False)):
# Critic interrupted; treat current output as best
# we have and stop.
session.interrupted = True
session.interruption_reason = (
f"critic:round_{round_num}:"
f"{critic_result.get('interruption_reason') or 'unknown'}"
)
return
critique = _parse_critique(
str(critic_result.get("output", ""))
)
yield Event.architecture_event(
session.id,
"actor_critic.critique",
round=round_num,
score=critique.score,
issues=critique.issues,
summary=critique.summary,
)
if critique.score >= self._threshold:
yield Event.architecture_event(
session.id,
"actor_critic.approved",
round=round_num,
score=critique.score,
)
return
if round_num >= self._max_rounds:
yield Event.architecture_event(
session.id,
"actor_critic.max_rounds_reached",
rounds=round_num,
final_score=critique.score,
)
return
# --- Refine via actor ---
yield Event.architecture_event(
session.id,
"actor_critic.actor_started",
round=round_num,
phase="refine",
)
issues_bulleted = "\n".join(
f"- {issue}" for issue in critique.issues
) or "(no specific issues listed; general improvement)"
refine_prompt = self._refine_template.format(
prompt=prompt,
output=current_output,
critique=critique.summary or "",
issues_bulleted=issues_bulleted,
)
refine_inv = SubagentInvocation(
self._actor,
refine_prompt,
session_id=f"{session.id}__actor_{round_num}",
)
async for ev in refine_inv.events():
yield ev
refine_result = refine_inv.result
session.turns += int(refine_result.get("turns", 0) or 0)
if bool(refine_result.get("interrupted", False)):
session.interrupted = True
session.interruption_reason = (
f"actor:round_{round_num}:"
f"{refine_result.get('interruption_reason') or 'unknown'}"
)
return
current_output = str(refine_result.get("output", ""))
session.output = current_output
yield Event.architecture_event(
session.id,
"actor_critic.actor_completed",
round=round_num,
phase="refine",
)
# ---------------------------------------------------------------------------
# Critique parser
# ---------------------------------------------------------------------------
_SCORE_RE = re.compile(
r"score\s*[:=]\s*([0-9]*\.?[0-9]+)", re.IGNORECASE
)
def _parse_critique(text: str) -> CriticOutput:
"""Best-effort parse of critic output.
Tries: (1) raw JSON, (2) JSON inside markdown code fences,
(3) regex fallback that extracts a score and uses the full
text as a single issue.
Returns a default ``CriticOutput`` (score 0.0, single issue =
raw text) when parsing fails entirely so the loop keeps making
progress on the next refine pass instead of crashing.
"""
cleaned = text.strip()
# (2) Strip markdown code fences if present.
if cleaned.startswith("```"):
lines = cleaned.split("\n")
if lines[0].startswith("```"):
lines = lines[1:]
# Trailing fence on its own line.
while lines and lines[-1].strip().startswith("```"):
lines = lines[:-1]
cleaned = "\n".join(lines).strip()
# (1) Try direct JSON parse.
parsed: object
try:
parsed = json.loads(cleaned)
except (json.JSONDecodeError, ValueError):
parsed = None
if isinstance(parsed, dict):
try:
issues_raw = parsed.get("issues", []) or []
issues = [str(i) for i in issues_raw if i]
score_raw = parsed.get("score", 0.0)
score = max(0.0, min(1.0, float(score_raw)))
summary = str(parsed.get("summary", ""))
return CriticOutput(
issues=issues, score=score, summary=summary
)
except (TypeError, ValueError):
pass
# (3) Regex fallback for the score; whole text becomes one issue.
match = _SCORE_RE.search(text)
score = 0.0
if match is not None:
try:
score = max(0.0, min(1.0, float(match.group(1))))
except ValueError:
score = 0.0
return CriticOutput(
issues=[text.strip()] if text.strip() else [],
score=score,
summary="",
)