Source code for jeevesagent.architecture.actor_critic

"""ActorCritic: generator + adversarial critic, asymmetric by design.

Sutton & Barto 1998 (RL foundations); LLM-era papers Madaan et al.
2023 (Self-Refine — same model both roles), Gou et al. 2023 (CRITIC),
Sun et al. 2025 (CGI — separate critic model). 2026 production
literature recommends ActorCritic for **quality-critical work** —
code generation, security review, important written communications.

The pattern in one line: actor proposes; critic finds problems with a
*different prompt and ideally a different model*; actor revises.

Why a separate :class:`SelfRefine`?
-----------------------------------
:class:`SelfRefine` runs critic + refiner with the parent's same
model and same prompt template. ActorCritic earns its complexity
only when the actor and critic have *different blind spots* —
typically different models. We require both ``actor`` and
``critic`` :class:`Agent` instances; for same-model self-critique,
use :class:`SelfRefine`.

Pattern
-------

1. **Round 0 (actor).** ``actor.run(prompt)`` produces an initial
   output.
2. **For each round up to ``max_rounds``:**

   a. **Critic.** ``critic.run(critique_prompt)`` produces a
      structured critique with an explicit ``score`` 0-1.
   b. **Approval check.** If ``critique.score >=
      approval_threshold``, terminate as approved.
   c. **Refine.** ``actor.run(refine_prompt)`` produces a revised
      output that addresses the critique. The new output replaces
      the old.

3. **Max rounds reached without approval.** Return the current
   output. Best we have.

Replay correctness
------------------
Each actor / critic invocation uses a deterministic session id
(``{parent}__actor_<round>`` / ``{parent}__critic_<round>``) so
replays of the parent reproduce the same sub-sessions.

Tuning
------
* ``max_rounds=3`` is the production sweet spot for code generation.
* ``approval_threshold=0.9`` is strict; lower to 0.85 for friendlier
  convergence.
* Use **different models** for actor and critic. Claude Opus actor +
  GPT-4o critic (or vice versa) is the canonical asymmetry.

Composition
-----------
* Inside :class:`Supervisor`: each worker can be an ActorCritic for
  per-domain quality control (``coder`` worker uses ActorCritic for
  code review).
* Inside :class:`Reflexion`: cross-session learning of which
  critique patterns produce real improvements.
"""

from __future__ import annotations

import json
import re
from collections.abc import AsyncIterator
from typing import TYPE_CHECKING

from pydantic import BaseModel, Field

from ..core.types import Event
from .base import AgentSession, Dependencies
from .helpers import SubagentInvocation

if TYPE_CHECKING:
    from ..agent.api import Agent


DEFAULT_CRITIQUE_TEMPLATE = """\
You are reviewing the output below against the original task. Find
every issue you can: factual errors, missing requirements, edge
cases, security holes, unclear language. Be specific — cite the
section of the output you're criticizing.

Output ONLY a JSON object with this shape:

{{"issues": ["...", "..."], "score": 0.0-1.0, "summary": "..."}}

The score is your confidence the output fully solves the task:
- 1.0 = no issues, ship it
- 0.7-0.9 = mostly correct, minor gaps
- 0.4-0.6 = real problems, must revise
- 0.0-0.3 = wrong or missing core deliverable

Original task:
{prompt}

Output to review:
{output}
"""


DEFAULT_REFINE_TEMPLATE = """\
Revise your previous output based on the critique below. Address
every point in the critique. Output ONLY the revised version, no
preamble or commentary about what changed.

Original task:
{prompt}

Previous output:
{output}

Critique:
{issues_bulleted}
"""


[docs] class CriticOutput(BaseModel): """Structured critic verdict. Parsed from the critic Agent's output. Falls back to a single- issue blob with score 0.0 when JSON parsing fails so the loop keeps making progress instead of crashing on a malformed reply. """ issues: list[str] = Field(default_factory=list) score: float = Field(ge=0.0, le=1.0, default=0.0) summary: str = ""
[docs] class ActorCritic: """Actor + adversarial critic with optional different models. Constructor parameters: * ``actor`` (required): the generating :class:`Agent`. Sees the original prompt on round 0 and a refine prompt on subsequent rounds. * ``critic`` (required): the reviewing :class:`Agent`. Sees the original prompt + the actor's current output and produces structured JSON critique. * ``max_rounds``: cap on critique-refine cycles after the initial generation. Default 3. * ``approval_threshold``: terminate when ``critique.score`` is at or above this value. Default 0.9. * ``critique_template`` / ``refine_template``: override the default prompts. Templates use ``{prompt}``, ``{output}``, ``{critique}``, ``{issues_bulleted}``. """ name = "actor-critic" def __init__( self, *, actor: Agent, critic: Agent, max_rounds: int = 3, approval_threshold: float = 0.9, critique_template: str | None = None, refine_template: str | None = None, ) -> None: if max_rounds < 1: raise ValueError("max_rounds must be >= 1") if not 0.0 <= approval_threshold <= 1.0: raise ValueError( "approval_threshold must be in [0.0, 1.0]" ) self._actor = actor self._critic = critic self._max_rounds = max_rounds self._threshold = approval_threshold self._critique_template = ( critique_template or DEFAULT_CRITIQUE_TEMPLATE ) self._refine_template = ( refine_template or DEFAULT_REFINE_TEMPLATE )
[docs] def declared_workers(self) -> dict[str, Agent]: return {"actor": self._actor, "critic": self._critic}
[docs] async def run( self, session: AgentSession, deps: Dependencies, prompt: str, ) -> AsyncIterator[Event]: # === Round 0: initial generation by actor === yield Event.architecture_event( session.id, "actor_critic.actor_started", round=0, phase="generate", ) actor_inv = SubagentInvocation( self._actor, prompt, session_id=f"{session.id}__actor_0" ) async for ev in actor_inv.events(): yield ev actor_result = actor_inv.result current_output = str(actor_result.get("output", "")) session.output = current_output session.turns += int(actor_result.get("turns", 0) or 0) if bool(actor_result.get("interrupted", False)): session.interrupted = True session.interruption_reason = ( f"actor:round_0:" f"{actor_result.get('interruption_reason') or 'unknown'}" ) return yield Event.architecture_event( session.id, "actor_critic.actor_completed", round=0, phase="generate", ) # === Critique → refine loop === for round_num in range(1, self._max_rounds + 1): status = await deps.budget.allows_step() if status.blocked: session.interrupted = True session.interruption_reason = ( f"budget:{status.reason}" ) yield Event.budget_exceeded(session.id, status) return if status.warn: yield Event.budget_warning(session.id, status) # --- Critic --- yield Event.architecture_event( session.id, "actor_critic.critic_started", round=round_num, ) critique_prompt = self._critique_template.format( prompt=prompt, output=current_output, ) critic_inv = SubagentInvocation( self._critic, critique_prompt, session_id=f"{session.id}__critic_{round_num}", ) async for ev in critic_inv.events(): yield ev critic_result = critic_inv.result session.turns += int(critic_result.get("turns", 0) or 0) if bool(critic_result.get("interrupted", False)): # Critic interrupted; treat current output as best # we have and stop. session.interrupted = True session.interruption_reason = ( f"critic:round_{round_num}:" f"{critic_result.get('interruption_reason') or 'unknown'}" ) return critique = _parse_critique( str(critic_result.get("output", "")) ) yield Event.architecture_event( session.id, "actor_critic.critique", round=round_num, score=critique.score, issues=critique.issues, summary=critique.summary, ) if critique.score >= self._threshold: yield Event.architecture_event( session.id, "actor_critic.approved", round=round_num, score=critique.score, ) return if round_num >= self._max_rounds: yield Event.architecture_event( session.id, "actor_critic.max_rounds_reached", rounds=round_num, final_score=critique.score, ) return # --- Refine via actor --- yield Event.architecture_event( session.id, "actor_critic.actor_started", round=round_num, phase="refine", ) issues_bulleted = "\n".join( f"- {issue}" for issue in critique.issues ) or "(no specific issues listed; general improvement)" refine_prompt = self._refine_template.format( prompt=prompt, output=current_output, critique=critique.summary or "", issues_bulleted=issues_bulleted, ) refine_inv = SubagentInvocation( self._actor, refine_prompt, session_id=f"{session.id}__actor_{round_num}", ) async for ev in refine_inv.events(): yield ev refine_result = refine_inv.result session.turns += int(refine_result.get("turns", 0) or 0) if bool(refine_result.get("interrupted", False)): session.interrupted = True session.interruption_reason = ( f"actor:round_{round_num}:" f"{refine_result.get('interruption_reason') or 'unknown'}" ) return current_output = str(refine_result.get("output", "")) session.output = current_output yield Event.architecture_event( session.id, "actor_critic.actor_completed", round=round_num, phase="refine", )
# --------------------------------------------------------------------------- # Critique parser # --------------------------------------------------------------------------- _SCORE_RE = re.compile( r"score\s*[:=]\s*([0-9]*\.?[0-9]+)", re.IGNORECASE ) def _parse_critique(text: str) -> CriticOutput: """Best-effort parse of critic output. Tries: (1) raw JSON, (2) JSON inside markdown code fences, (3) regex fallback that extracts a score and uses the full text as a single issue. Returns a default ``CriticOutput`` (score 0.0, single issue = raw text) when parsing fails entirely so the loop keeps making progress on the next refine pass instead of crashing. """ cleaned = text.strip() # (2) Strip markdown code fences if present. if cleaned.startswith("```"): lines = cleaned.split("\n") if lines[0].startswith("```"): lines = lines[1:] # Trailing fence on its own line. while lines and lines[-1].strip().startswith("```"): lines = lines[:-1] cleaned = "\n".join(lines).strip() # (1) Try direct JSON parse. parsed: object try: parsed = json.loads(cleaned) except (json.JSONDecodeError, ValueError): parsed = None if isinstance(parsed, dict): try: issues_raw = parsed.get("issues", []) or [] issues = [str(i) for i in issues_raw if i] score_raw = parsed.get("score", 0.0) score = max(0.0, min(1.0, float(score_raw))) summary = str(parsed.get("summary", "")) return CriticOutput( issues=issues, score=score, summary=summary ) except (TypeError, ValueError): pass # (3) Regex fallback for the score; whole text becomes one issue. match = _SCORE_RE.search(text) score = 0.0 if match is not None: try: score = max(0.0, min(1.0, float(match.group(1)))) except ValueError: score = 0.0 return CriticOutput( issues=[text.strip()] if text.strip() else [], score=score, summary="", )