"""Supervisor: workers + a ``delegate`` tool injected into the loop.
Anthropic Multi-Agent Research System (2026 internal report) +
Anthropic Agent Teams (Feb 2026). The 2026 production consensus:
**hierarchical Supervisor is the multi-agent pattern that earns its
cost in production.** Anthropic reports +90.2% on their MA research
benchmark vs single-agent baseline.
Pattern
-------
The supervisor itself runs an architecture (default
:class:`ReAct`). Its tool host is augmented with one extra tool:
``delegate(worker, instructions)``. When the supervising model
calls ``delegate``, the named worker :class:`Agent` runs to
completion with the supervisor's instructions and returns its final
answer as the tool result.
Because :class:`ReAct`'s tool dispatch is already parallel
(:func:`anyio.create_task_group` over all tool calls in a turn),
**the supervisor gets parallel delegation for free** — emit two
``delegate`` calls in one turn and both workers run concurrently.
Replay correctness
------------------
Each ``delegate`` call is wrapped by ``runtime.step`` at the
parent's tool dispatch layer (see ReAct), so the worker's full
:class:`RunResult.output` is journaled in the parent's session.
Replays return the cached worker output without re-running the
worker. The worker is itself an :class:`Agent` and uses a
collision-free session id (parent + worker name + a fresh ULID)
when it does run.
Composition
-----------
* Workers can be any architecture themselves (DeepAgent worker for
research, ActorCritic worker for code, plain Agent for simple
specialists).
* Workers can be supervisors (nested teams).
* Wrap Supervisor in Reflexion for cross-session learning of which
worker handles which intent best.
"""
from __future__ import annotations
from collections.abc import AsyncIterator
from dataclasses import replace
from typing import TYPE_CHECKING
import anyio
from anyio.streams.memory import MemoryObjectSendStream
from ..core.context import get_run_context
from ..core.ids import new_id
from ..core.types import Event
from ..tools.registry import Tool
from .base import AgentSession, Architecture, Dependencies
from .helpers import SubagentInvocation
from .react import ReAct
from .tool_host_wrappers import ExtendedToolHost
if TYPE_CHECKING:
from ..agent.api import Agent
DEFAULT_SUPERVISOR_TEMPLATE = """\
You are a supervisor coordinating specialist worker agents.
For each task you receive:
1. Decide which workers are needed.
2. Call `delegate(worker, instructions)` to invoke a specialist.
3. Each worker runs independently and returns its final answer.
4. Either synthesize worker outputs into a unified response, OR
call `forward_message(worker)` if a single worker's output IS
already the final answer the user wants. Forwarding skips a
paraphrase round-trip: the worker's output is returned verbatim
as YOUR final response. End your turn immediately after a
forward_message call.
You can delegate multiple workers in a single turn — they will run
in parallel. Be specific in the ``instructions`` you pass; workers
do NOT see the user's original message, only what you write.
Available workers:
{worker_descriptions}
"""
[docs]
class Supervisor:
"""Coordinator + workers, glued by a ``delegate`` tool.
The supervisor's base architecture (default :class:`ReAct`) sees
a fresh ``delegate(worker, instructions)`` tool that routes calls
to the named worker :class:`Agent`. Worker outputs come back as
tool results just like any other tool call.
Constructor
-----------
* ``workers``: dict mapping role-names to fully-built
:class:`Agent` instances. Names must be valid identifiers
(the model emits them as the ``worker`` argument).
* ``base``: the architecture the supervisor itself runs.
Default :class:`ReAct`. Wrap inside :class:`Reflexion` to
learn delegation patterns across runs.
* ``instructions_template``: format string with
``{worker_descriptions}``. Default teaches the supervisor
to delegate effectively. The agent's own ``instructions``
are *prepended* (so domain context survives).
* ``delegate_tool_name``: defaults to ``"delegate"``. Customize
to avoid clashes with user-defined tools that happen to have
the same name.
* ``forward_tool_name``: defaults to ``"forward_message"``. The
supervisor calls this with a worker name to return that
worker's last output VERBATIM as the supervisor's final
response. Skips a synthesis round-trip — the
`langchain.com/blog/benchmarking-multi-agent-architectures`_
benchmark showed +50% quality on tasks where the supervisor
would otherwise paraphrase a worker's output.
"""
name = "supervisor"
def __init__(
self,
*,
workers: dict[str, Agent],
base: Architecture | None = None,
instructions_template: str | None = None,
delegate_tool_name: str = "delegate",
forward_tool_name: str = "forward_message",
) -> None:
if not workers:
raise ValueError("Supervisor requires at least one worker")
self._workers: dict[str, Agent] = dict(workers)
self._base: Architecture = base if base is not None else ReAct()
self._template = (
instructions_template or DEFAULT_SUPERVISOR_TEMPLATE
)
self._delegate_name = delegate_tool_name
self._forward_name = forward_tool_name
[docs]
def declared_workers(self) -> dict[str, Agent]:
return dict(self._workers)
[docs]
def add_worker(self, name: str, agent: Agent) -> None:
"""Register a worker between runs.
Safe to call between :meth:`Agent.run` invocations on the
agent that owns this supervisor; the new worker becomes
available for ``delegate(name, ...)`` on the next run.
Calling mid-run is undefined — the supervisor's prompt is
composed at run start.
"""
if not name or not name.isidentifier():
raise ValueError(
f"worker name {name!r} must be a valid identifier"
)
self._workers[name] = agent
[docs]
def remove_worker(self, name: str) -> Agent | None:
"""Unregister a worker by name. Returns the removed Agent
if it was registered, ``None`` otherwise. Same lifecycle
rules as :meth:`add_worker`."""
return self._workers.pop(name, None)
[docs]
async def run(
self,
session: AgentSession,
deps: Dependencies,
prompt: str,
) -> AsyncIterator[Event]:
# 1. Set up a shared event channel so worker events emitted
# from inside the delegate tool can stream through to the
# supervisor's outer generator alongside the base
# architecture's events. Without this, MODEL_CHUNK and
# TOOL_CALL events from the workers would be dropped on
# the floor.
send_chan, recv_chan = anyio.create_memory_object_stream[Event](
max_buffer_size=128
)
# 2. Build the delegate + forward_message tools.
# - delegate routes to workers and forwards events.
# - forward_message lets the supervisor return a
# worker's last output verbatim, bypassing a
# paraphrase round-trip.
# Both tools share the ``last_outputs`` dict (delegate
# writes, forward_message reads). ``forward_request``
# captures the worker text the supervisor wants forwarded;
# we override session.output with it after the base
# architecture completes.
last_outputs: dict[str, str] = {}
forward_request: dict[str, str] = {}
delegate_tool = _make_delegate_tool(
self._workers,
session.id,
tool_name=self._delegate_name,
event_sink=send_chan,
last_outputs=last_outputs,
)
forward_tool = _make_forward_message_tool(
last_outputs=last_outputs,
forward_request=forward_request,
tool_name=self._forward_name,
worker_names=list(self._workers.keys()),
)
# 3. Wrap the parent's ToolHost so the model sees `delegate`
# + `forward_message` alongside whatever tools the parent
# already had.
wrapped_host = ExtendedToolHost(
deps.tools, [delegate_tool, forward_tool]
)
# 3. Compose instructions: user's domain prompt + supervisor
# template (with worker descriptions). Worker descriptions
# use each worker Agent's own ``instructions`` so the
# supervisor knows what each one does.
worker_lines = []
for wname, wagent in self._workers.items():
desc = (wagent.instructions or "").strip()
# Trim long instructions so the supervisor prompt stays
# focused. 200 chars is enough for "you are a Python
# coder with access to fs and bash" style summaries.
if len(desc) > 200:
desc = desc[:197] + "..."
worker_lines.append(
f" - {wname}: {desc or '(no description)'}"
)
worker_descriptions = "\n".join(worker_lines)
supervisor_section = self._template.format(
worker_descriptions=worker_descriptions
)
composed_instructions = (
f"{session.instructions}\n\n---\n\n{supervisor_section}"
if session.instructions
else supervisor_section
)
original_instructions = session.instructions
session.instructions = composed_instructions
sup_deps = replace(deps, tools=wrapped_host)
yield Event.architecture_event(
session.id,
"supervisor.workers_ready",
workers=list(self._workers.keys()),
)
# 4. Run the base architecture in a background task; both
# its events AND any worker events emitted from the
# delegate tool flow through the shared channel. We yield
# from the channel concurrently.
async def _run_base() -> None:
try:
async for event in self._base.run(
session, sup_deps, prompt
):
await send_chan.send(event)
finally:
await send_chan.aclose()
try:
async with anyio.create_task_group() as tg:
tg.start_soon(_run_base)
async with recv_chan:
async for ev in recv_chan:
yield ev
finally:
# Restore the original instructions on session even
# though the session is single-use; harmless and keeps
# the abstraction clean for tests that re-use sessions.
session.instructions = original_instructions
# If the supervisor model called forward_message at any
# point, override the final output with the captured worker
# text. The model's own final assistant message (typically
# "[done]" or similar after the forward call) is discarded.
if "output" in forward_request:
session.output = forward_request["output"]
yield Event.architecture_event(
session.id,
"supervisor.forwarded",
worker=forward_request.get("worker", ""),
)
yield Event.architecture_event(
session.id,
"supervisor.completed",
workers=list(self._workers.keys()),
)
# ---------------------------------------------------------------------------
# delegate tool + tool-host wrapper
# ---------------------------------------------------------------------------
def _make_delegate_tool(
workers: dict[str, Agent],
parent_session_id: str,
*,
tool_name: str,
event_sink: MemoryObjectSendStream[Event] | None = None,
last_outputs: dict[str, str] | None = None,
) -> Tool:
"""Build a :class:`Tool` whose ``execute`` routes to the named
worker :class:`Agent` and returns its final output.
Each invocation generates a fresh ULID-suffixed session_id for
the worker; replay correctness for the parent comes from the
parent's own runtime journal caching the tool result, so the
worker's session_id only matters during the first execution.
When ``event_sink`` is provided, the worker's streaming events
(model chunks, nested tool calls, tool results, architecture
progress) are forwarded into the channel so the supervisor's
outer ``stream(...)`` consumer sees token-by-token output. When
``None``, the worker's events are silently dropped (legacy
behaviour, kept as a fallback).
"""
async def _delegate(worker: str, instructions: str) -> str:
agent = workers.get(worker)
if agent is None:
known = ", ".join(sorted(workers))
return (
f"Error: unknown worker {worker!r}. Known: {known}"
)
suffix = new_id("del")
worker_session_id = (
f"{parent_session_id}__delegate_{worker}_{suffix}"
)
if event_sink is None:
# No event sink → fall back to plain run() (events lost).
# Inherit the parent's RunContext (user_id + metadata) so
# the worker runs in the same namespace partition as the
# supervisor; ``session_id`` is the worker-specific one
# we just derived, overriding the parent's session.
result = await agent.run(
instructions,
session_id=worker_session_id,
context=get_run_context(),
)
output = result.output
if last_outputs is not None:
last_outputs[worker] = output
return output
# Stream worker events into the supervisor's shared channel
# using a clone of the send half (clone keeps the channel
# alive past this tool call's lifetime).
invocation = SubagentInvocation(
agent, instructions, session_id=worker_session_id
)
async with event_sink.clone() as sink:
async for ev in invocation.events():
await sink.send(ev)
output = str(invocation.result.get("output", ""))
if last_outputs is not None:
last_outputs[worker] = output
return output
worker_names = list(workers.keys())
worker_descriptions = "\n".join(
f" - {name}: {(a.instructions or '').strip()[:120]}"
for name, a in workers.items()
)
return Tool(
name=tool_name,
description=(
"Delegate a subtask to a named specialist worker. "
"The worker runs independently and returns its final "
"answer as a string.\n\n"
f"Available workers:\n{worker_descriptions}"
),
fn=_delegate,
input_schema={
"type": "object",
"properties": {
"worker": {
"type": "string",
# Strict-schema providers (Anthropic, OpenAI
# strict mode) reject calls outside this list,
# so hallucinated worker names never reach our
# tool implementation.
"enum": worker_names,
"description": (
"Name of the worker to delegate to. "
"Must be one of: "
f"{', '.join(worker_names)}."
),
},
"instructions": {
"type": "string",
"description": (
"Task description for the worker. Be "
"specific — the worker does not see the "
"user's original message."
),
},
},
"required": ["worker", "instructions"],
},
)
def _make_forward_message_tool(
*,
last_outputs: dict[str, str],
forward_request: dict[str, str],
tool_name: str,
worker_names: list[str],
) -> Tool:
"""Build the ``forward_message`` tool.
Reads from ``last_outputs`` (populated by the delegate tool)
and writes the chosen worker's output into ``forward_request``.
The supervisor's run loop checks ``forward_request`` after the
base architecture finishes; if set, the agent's final output
is overridden with the captured text — no synthesis round-trip.
"""
async def _forward(worker: str) -> str:
output = last_outputs.get(worker)
if output is None:
known = ", ".join(sorted(last_outputs)) or "(none yet)"
return (
f"Error: no captured output for worker {worker!r}. "
f"You must call delegate({worker}, ...) first. "
f"Workers with captured output: {known}"
)
forward_request["output"] = output
forward_request["worker"] = worker
return (
f"[forward_message recorded — {worker}'s last output "
"will be returned verbatim as the final response. End "
"your turn now without writing any additional text.]"
)
return Tool(
name=tool_name,
description=(
"Return a worker's last delegated output VERBATIM as "
"the supervisor's final response. Use this when one "
"worker already produced exactly what the user asked "
"for and synthesis would just paraphrase it. End your "
"turn immediately after calling — no additional text."
),
fn=_forward,
input_schema={
"type": "object",
"properties": {
"worker": {
"type": "string",
# Constrains to the actual worker pool. The
# tool body still checks ``last_outputs`` so a
# forward before any delegate returns a clear
# error.
"enum": worker_names,
"description": (
"Name of the worker whose last output "
"should be forwarded. Must be one of: "
f"{', '.join(worker_names)}."
),
},
},
"required": ["worker"],
},
)