Core Engine

APEX Synthesizer

APEX replaces the original plain synthesizer with an intelligent synthesis engine. It produces a confidence score (0.0–1.0) on every result, optionally cites which agent contributed each fact, and can even call tools mid-synthesis for verification.

How it works

  1. All agent results are collected after parallel execution
  2. APEX receives the original query + all agent outputs in a structured prompt
  3. It synthesizes a unified answer and ends its response with CONFIDENCE: 0.85
  4. The confidence line is parsed out and stored on AgentResult.confidence
  5. When cite_sources=True, APEX adds [source: agent_name] inline markers

Basic usage

apex_example.py
from deepcrew import Agent, Orchestrator, ApexConfig

orch = Orchestrator(
    agents=[
        Agent("researcher", model="openai/gpt-4o-mini", system_prompt="Research facts."),
        Agent("analyst",    model="anthropic/claude-haiku-4-5-20251001", system_prompt="Analyze data."),
    ],
    router_model="openai/gpt-4o-mini",
    apex_model="openai/gpt-4o",
    apex_config=ApexConfig(
        cite_sources=True,           # [source: researcher] / [source: analyst] inline
        confidence_threshold=0.75,   # warn if below (future: request more agents)
        allow_tools=False,           # APEX can call tools mid-synthesis (experimental)
    ),
)

result = await orch.run("What causes inflation?")
print(result.final_text)
print(f"Confidence: {result.agent_results[-1].confidence:.2%}")

Standalone APEX

python
from deepcrew import APEXSynthesizer, ApexConfig, AgentResult

# Use APEX outside of Orchestrator — e.g., synthesize cached results
apex = APEXSynthesizer(
    model="openai/gpt-4o",
    config=ApexConfig(cite_sources=True),
)

results: list[AgentResult] = [...]  # your pre-computed results

synthesis = await apex.synthesize(
    original_query="Explain quantum entanglement",
    results=results,
)

print(synthesis.text)
print(f"Confidence: {synthesis.confidence:.2f}")

# Citations
for citation in apex.build_citations(results, synthesis.text):
    print(f"[{citation.agent_id}] {citation.claim[:80]}")

ApexConfig reference

confidence_threshold
float = 0.7
Minimum acceptable confidence. Below this, the result is still returned but EventType.APEX_DONE includes a below_threshold=True flag for consumers to handle.
cite_sources
bool = True
When True, APEX is instructed to add [source: agent_name] inline markers to attribute facts to specific agents.
allow_tools
bool = False
When True, APEX itself can call tools during synthesis (e.g., verify a stat by searching). Requires providing tool_defs to synthesize().
system_prompt
str | None
Override the default APEX system prompt entirely. Must include instructions to end with CONFIDENCE: float.

APEX events

python
from deepcrew.types import EventType

async for event in orch.stream("..."):
    if event.event == EventType.APEX_START:
        agents = event.data["agents"]
        print(f"APEX synthesizing from {len(agents)} agents: {agents}")
    elif event.event == EventType.APEX_DONE:
        conf = event.data["confidence"]
        print(f"APEX done — confidence {conf:.2f}")
        if conf < 0.7:
            print("⚠ Low confidence — consider adding more specialist agents")
Core Engine

Agent Spawning

deepcrew-ai v0.2.0 introduces Claude Code-style dynamic agent spawning. Any running agent can call a built-in spawn_agent tool to create a sub-agent mid-loop, with tools automatically selected from a global pool by the ToolAllocator.

How it works

  1. You provide a global_tools pool to Orchestrator
  2. With enable_spawn=True, every agent gets a spawn_agent(task, tools, model) tool injected
  3. When an agent calls spawn_agent, ToolAllocator uses the router LLM to pick the most relevant tools from the global pool for that specific task
  4. A fresh sub-agent is created and runs to completion, its result returned to the parent
  5. A SPAWN_AGENT stream event is emitted for observability

Enable spawning via Orchestrator

spawning_example.py
from deepcrew import Agent, Orchestrator, tool

@tool
def search_web(query: str) -> str:
    "Search the web."
    ...

@tool
def read_file(path: str) -> str:
    "Read a local file."
    ...

@tool
def run_sql(query: str) -> list[dict]:
    "Execute a SQL query."
    ...

@tool
def call_api(url: str, method: str = "GET") -> dict:
    "Make an HTTP API call."
    ...

master = Agent(
    name="coordinator",
    model="openai/gpt-4o",
    system_prompt="""You are a coordinator agent. For complex subtasks,
    use the spawn_agent tool to delegate to a specialized sub-agent.""",
)

orch = Orchestrator(
    agents=[master],
    router_model="openai/gpt-4o-mini",
    apex_model="openai/gpt-4o",
    global_tools=[search_web, read_file, run_sql, call_api],  # pool
    enable_spawn=True,   # injects spawn_agent tool into master
)

result = await orch.run(
    "Research the top 3 AI papers from last month and summarize key findings."
)
print(result.final_text)

Standalone spawning

python
from deepcrew import spawn_agent, SpawnRequest

request = SpawnRequest(
    task="Find all Python files that import pandas and list their names.",
    tools=["read_file", "search_web"],   # hint: names from global pool
    model="openai/gpt-4o-mini",
    system_prompt="You are a code analysis assistant.",
    max_turns=5,
)

result = await spawn_agent(
    request=request,
    all_tool_defs=[search_web_def, read_file_def],
    parent_queue=my_queue,
    router_model="openai/gpt-4o-mini",
    parent_agent_id="coordinator",
)

print(result.text)

ToolAllocator

python
from deepcrew import ToolAllocator

allocator = ToolAllocator(router_model="openai/gpt-4o-mini")

# Given a task description and a large pool of tools,
# returns only the most relevant subset (up to max_tools)
relevant_tools = await allocator.allocate(
    task="Analyze sentiment in customer reviews and generate a report",
    all_tools=my_tool_defs,   # list[ToolDef] — could be dozens of tools
    max_tools=5,              # return at most 5
)

print([t.name for t in relevant_tools])
💡
The ToolAllocator prompt includes each tool's name and description. A good tool description dramatically improves allocation accuracy.

SpawnRequest reference

task*
str
Natural language description of the sub-task. Used both for tool allocation and as the sub-agent's first user message.
tools
list[str] = []
Optional hint: tool names the parent agent thinks the sub-agent should have. ToolAllocator still makes the final decision.
model
str | None
Model string for the sub-agent. Defaults to the parent agent's model.
system_prompt
str | None
Optional system prompt override for the sub-agent.
max_turns
int = 5
Max tool-call cycles for the sub-agent.
Core Engine

Looping Methodology

The outer iteration loop is distinct from the inner per-turn max_turns cycle. The loop runs the entire agent (including all its tool calls) and re-runs it if the result doesn't meet a convergence criterion — ideal for search-refine, draft-critique, and iterative research patterns.

How it works

  1. Agent runs (all inner turns until no more tool calls)
  2. convergence_fn(result) is called — if it returns True, the loop exits
  3. If not converged, refine_prompt is appended and the agent runs again
  4. Loop exits when max_iterations is reached or convergence is achieved
  5. result.loop_iterations records how many outer iterations ran

Basic loop with convergence

loop_example.py
from deepcrew import Agent, run_agent, LoopConfig, tool

@tool
def search_web(query: str) -> str:
    "Search the web for information."
    ...

agent = Agent(
    name="researcher",
    model="openai/gpt-4o-mini",
    system_prompt="You are a thorough researcher. Use search to gather comprehensive information.",
    tools=[search_web],
    loop_config=LoopConfig(
        max_iterations=4,
        # Convergence: result is long enough to be a proper answer
        convergence_fn=lambda r: len(r.text) > 800 and r.text.count("\n") > 5,
        # What to say to the agent if not converged
        refine_prompt="Your answer is incomplete. Search for more details and expand your response significantly.",
    ),
)

result = await run_agent(
    agent,
    [{"role": "user", "content": "Explain the mechanism of CRISPR-Cas9 gene editing"}],
)

print(result.text)
print(f"Iterations: {result.loop_iterations}")

run_agent_loop() directly

python
from deepcrew import run_agent_loop, LoopConfig

result = await run_agent_loop(
    agent=my_agent,
    messages=[{"role": "user", "content": "Draft an executive summary"}],
    tool_defs=None,
    queue=my_queue,
    agent_id="drafter",
)

search_loop() — confidence-based iteration

python
from deepcrew import search_loop, Agent, tool

@tool
def search_web(query: str) -> str:
    "Search the web."
    ...

agent = Agent("searcher", model="openai/gpt-4o-mini",
              system_prompt="You are a research agent.", tools=[search_web])

# Keeps searching until APEX confidence >= 0.8 or 3 iterations
result = await search_loop(
    query="What is the current state of nuclear fusion research?",
    search_tool=search_web,
    agent=agent,
    max_iterations=3,
    confidence_threshold=0.8,
)

Stop condition (early exit)

python
from deepcrew import LoopConfig, LoopConvergedError

def check_done(result):
    # Raise LoopConvergedError to stop immediately and return result
    if "FINAL ANSWER:" in result.text:
        raise LoopConvergedError(result)
    return False

agent = Agent("reasoner", model="openai/gpt-4o",
              system_prompt="When you have a final answer, prefix it with 'FINAL ANSWER:'.",
              loop_config=LoopConfig(
                  max_iterations=6,
                  convergence_fn=check_done,
              ))

LoopConfig reference

max_iterations
int = 5
Hard limit on outer loop iterations. Loop always exits after this many runs, converged or not.
convergence_fn
Callable | None
Called with the current AgentResult. Return True to stop. Raise LoopConvergedError(result) for immediate exit with that result.
stop_condition
Callable | None
Alternative to convergence_fn. Raises LoopConvergedError on its own — useful for externalizing early-exit logic.
refine_prompt
str
Appended to conversation on each non-converged iteration. Default: "Your answer is incomplete. Please search for more information and expand your response."

Loop events

python
from deepcrew.types import EventType

while True:
    event = await queue.get()
    if event is None: break
    if event.event == EventType.LOOP_ITERATION:
        i = event.data["iteration"]
        converged = event.data["converged"]
        print(f"Loop iteration {i} — {'converged' if converged else 'refining...'}")
Core Engine

Skills

Skills are higher-level capability bundles. They look identical to tools from the LLM's perspective (both become ToolDef), but can wrap multi-step logic, sub-agents, or external APIs internally. Three built-ins are included; you can also create custom skills with the @skill decorator.

Built-in skills

skills_example.py
from deepcrew import Agent, run_agent
from deepcrew import WebSearchSkill, SummarizeSkill, CodeExecutionSkill

agent = Agent(
    name="assistant",
    model="openai/gpt-4o",
    system_prompt="You are a versatile AI assistant.",
    skills=[
        WebSearchSkill(),                            # DuckDuckGo Instant Answer API
        SummarizeSkill(model="openai/gpt-4o-mini"),  # LLM-backed summarization
        CodeExecutionSkill(timeout=15.0),            # sandboxed Python subprocess
    ],
)

result = await run_agent(agent, [
    {"role": "user", "content": "Search for Python async best practices, summarize them, then write a demo script and run it."}
])
print(result.text)

@skill decorator — custom skills

python
from deepcrew import skill, Agent

@skill(name="translate", description="Translate text to another language")
async def translate(text: str, target_language: str) -> str:
    """
    Args:
        text (str): The text to translate.
        target_language (str): Target language code, e.g. 'es', 'fr', 'ja'.
    """
    import httpx
    async with httpx.AsyncClient() as client:
        r = await client.post(
            "https://libretranslate.de/translate",
            json={"q": text, "source": "auto", "target": target_language},
        )
        return r.json()["translatedText"]

@skill(name="send_slack", description="Send a message to a Slack channel")
async def send_slack(channel: str, message: str) -> str:
    """
    Args:
        channel (str): Slack channel name (without #).
        message (str): Message text to send.
    """
    import os
    import httpx
    async with httpx.AsyncClient() as client:
        await client.post(
            "https://slack.com/api/chat.postMessage",
            headers={"Authorization": f"Bearer {os.environ['SLACK_BOT_TOKEN']}"},
            json={"channel": channel, "text": message},
        )
    return f"Message sent to #{channel}"

agent = Agent(
    "comms",
    model="openai/gpt-4o",
    system_prompt="Help with communications and translations.",
    skills=[translate, send_slack],
)

Skill class — full custom implementation

python
from deepcrew.skills.base import Skill

class DatabaseQuerySkill(Skill):
    name = "database_query"
    description = "Execute a read-only SQL query against the production database"
    parameters = {
        "type": "object",
        "properties": {
            "sql": {"type": "string", "description": "SQL SELECT query to execute"},
            "limit": {"type": "integer", "description": "Max rows to return", "default": 100},
        },
        "required": ["sql"],
    }

    def __init__(self, connection_string: str):
        self._conn_str = connection_string

    async def execute(self, sql: str, limit: int = 100) -> str:
        import asyncpg
        conn = await asyncpg.connect(self._conn_str)
        try:
            rows = await conn.fetch(f"{sql} LIMIT {limit}")
            return str([dict(r) for r in rows])
        finally:
            await conn.close()

# Use it:
agent = Agent("db_agent", model="openai/gpt-4o",
              skills=[DatabaseQuerySkill("postgresql://...")])

SkillRegistry

python
from deepcrew import SkillRegistry
from deepcrew import WebSearchSkill, SummarizeSkill

# Register globally
SkillRegistry.register(WebSearchSkill())
SkillRegistry.register(SummarizeSkill())

# Retrieve by name
search = SkillRegistry.get("web_search")
summarize = SkillRegistry.get("summarize")

# List all registered skills
for skill in SkillRegistry.list_all():
    print(f"{skill.name}: {skill.description}")

Built-in skill reference

ClassTool nameDescriptionConfig
WebSearchSkillweb_searchDuckDuckGo Instant Answer API. Returns top results.None
SummarizeSkillsummarizeLLM-backed text summarization.model="openai/gpt-4o-mini"
CodeExecutionSkillexecute_codeRuns Python in an isolated subprocess.timeout=10.0
Infrastructure

Memory Providers

Memory providers let agents maintain context across turns, runs, and even restarts. They auto-inject into the LLM call (relevant memories added as a system message), and auto-store tool results for future retrieval.

InMemoryProvider — short-term context

memory_example.py
from deepcrew import Agent, run_agent, InMemoryProvider

memory = InMemoryProvider()

agent = Agent(
    name="assistant",
    model="openai/gpt-4o-mini",
    system_prompt="You are a helpful assistant with memory.",
    memory=memory,
)

# First interaction
result1 = await run_agent(agent, [
    {"role": "user", "content": "My name is Alice and I'm building a Python library."}
])

# Memory automatically stores tool results and LLM context
# Second interaction — agent will remember Alice's project
result2 = await run_agent(agent, [
    {"role": "user", "content": "What was my project about again?"}
])

FileMemoryProvider — persistent context

python
from pathlib import Path
from deepcrew import Agent, run_agent, FileMemoryProvider

# Persists across process restarts
memory = FileMemoryProvider(Path.home() / ".deepcrew" / "my_agent_memory.json")

agent = Agent(
    name="persistent_bot",
    model="openai/gpt-4o-mini",
    memory=memory,
)

# All tool results are atomically written to the JSON file
# On next startup, memories are loaded and injected into context

Custom MemoryProvider

python
from deepcrew.memory.base import MemoryProvider

class RedisMemoryProvider(MemoryProvider):
    """Redis-backed memory for distributed agent deployments."""

    def __init__(self, redis_url: str, ttl: int = 3600):
        import redis.asyncio as redis
        self._client = redis.from_url(redis_url)
        self._ttl = ttl

    async def store(self, key: str, value: str) -> None:
        await self._client.setex(key, self._ttl, value)

    async def retrieve(self, key: str) -> str | None:
        val = await self._client.get(key)
        return val.decode() if val else None

    async def search(self, query: str, top_k: int = 5) -> list[str]:
        # Simple prefix scan — production should use semantic search
        keys = await self._client.keys(f"*{query[:20]}*")
        results = []
        for key in keys[:top_k]:
            val = await self._client.get(key)
            if val:
                results.append(val.decode())
        return results

    async def clear(self) -> None:
        await self._client.flushdb()

agent = Agent("distributed", model="openai/gpt-4o",
              memory=RedisMemoryProvider("redis://localhost:6379"))

Memory events

python
from deepcrew.types import EventType

async for event in run_agent(agent, messages, queue=queue):
    if event.event == EventType.MEMORY_RETRIEVE:
        n = event.data["count"]
        print(f"Injected {n} memories into context")
    elif event.event == EventType.MEMORY_STORE:
        key = event.data["key"]
        print(f"Stored tool result to memory: {key}")

MemoryProvider ABC

python
class MemoryProvider(ABC):
    @abstractmethod
    async def store(self, key: str, value: str) -> None: ...
    @abstractmethod
    async def retrieve(self, key: str) -> str | None: ...
    @abstractmethod
    async def search(self, query: str, top_k: int = 5) -> list[str]: ...
    @abstractmethod
    async def clear(self) -> None: ...
Infrastructure

Retry & Fallback Policies

Configure per-agent retry behavior with exponential backoff, and model fallback chains that activate when all retries fail.

Basic retry

retry_example.py
from deepcrew import Agent, RetryPolicy

agent = Agent(
    name="resilient",
    model="openai/gpt-4o",
    system_prompt="Be helpful.",
    retry_policy=RetryPolicy(
        max_retries=3,          # try up to 3 more times after the first failure
        backoff_seconds=1.0,    # base wait between retries
        exponential=True,       # 1s, 2s, 4s, 8s ... (doubles each time)
        retry_on=(Exception,),  # retry on any exception (default)
    ),
)

Retry specific exceptions only

python
import litellm
from deepcrew import RetryPolicy

# Only retry on rate limit and connection errors
agent = Agent(
    "selective_retry",
    model="openai/gpt-4o",
    retry_policy=RetryPolicy(
        max_retries=5,
        backoff_seconds=2.0,
        retry_on=(
            litellm.RateLimitError,
            litellm.APIConnectionError,
            TimeoutError,
        ),
    ),
)

Fallback chain

python
from deepcrew import Agent, RetryPolicy, FallbackChain

agent = Agent(
    name="fault_tolerant",
    model="openai/gpt-4o",           # primary model
    retry_policy=RetryPolicy(
        max_retries=2,
        backoff_seconds=1.0,
    ),
    fallback_chain=FallbackChain(models=[
        "anthropic/claude-haiku-4-5-20251001",  # try first if gpt-4o fails all retries
        "gemini/gemini-2.0-flash",               # try second
        "ollama/llama3.2",                       # local fallback
    ]),
)

# Flow: gpt-4o → retry 1 → retry 2 → claude-haiku → retry 1 → retry 2 → gemini → ...

Retry events

python
from deepcrew.types import EventType

while True:
    event = await queue.get()
    if event is None: break
    if event.event == EventType.RETRY_ATTEMPT:
        data = event.data
        print(f"Retry {data['attempt']} on {data['model']} — waiting {data['delay']:.1f}s")
    elif event.event == EventType.FALLBACK_TRIGGERED:
        print(f"Falling back from {event.data['from_model']} → {event.data['to_model']}")

RetryPolicy reference

max_retries
int = 3
Number of additional attempts after the first failure. max_retries=3 means up to 4 total calls per model.
backoff_seconds
float = 1.0
Base wait time in seconds between retries. With exponential=True, this doubles each retry.
retry_on
tuple[type[Exception], ...] = (Exception,)
Exception types that trigger a retry. Use specific types like litellm.RateLimitError to avoid retrying logic errors.
exponential
bool = True
Whether to double the backoff time on each retry. False gives fixed backoff.
Infrastructure

Observability (OpenTelemetry)

deepcrew-ai emits OpenTelemetry spans for every LLM call, tool execution, and workflow step. When observability=None (the default), all span context managers are nullcontext() — absolutely zero overhead.

Installation

shell
pip install "deepcrew-ai[otel]"
# Installs: opentelemetry-api, opentelemetry-sdk, opentelemetry-exporter-otlp

Quick start with Jaeger

observability_example.py
from deepcrew import Agent, Orchestrator, ObservabilityConfig, ApexConfig

obs = ObservabilityConfig(
    otel_endpoint="http://localhost:4317",  # gRPC endpoint
    service_name="my-ai-app",
    enabled=True,
    export_format="grpc",   # or "http" for HTTP/protobuf
)

orch = Orchestrator(
    agents=[
        Agent("researcher", model="openai/gpt-4o-mini", system_prompt="Research."),
        Agent("writer", model="anthropic/claude-haiku-4-5-20251001", system_prompt="Write."),
    ],
    apex_model="openai/gpt-4o",
    apex_config=ApexConfig(cite_sources=True),
)

# Pass observability to run() — propagated to all agents automatically
# (Note: Orchestrator.run() accepts **kwargs forwarded to run_agent)
result = await orch.run("Explain blockchain technology")

With run_agent()

python
from deepcrew import Agent, run_agent, ObservabilityConfig

obs = ObservabilityConfig(otel_endpoint="http://localhost:4317")

result = await run_agent(
    agent,
    [{"role": "user", "content": "Hello!"}],
    observability=obs,   # that's it
)

# Spans emitted:
# deepcrew.agent.run     → covers entire agent lifecycle
#   deepcrew.llm.call    → each LLM request (includes model, agent_id, tokens)
#   deepcrew.tool.call   → each tool execution (includes tool_name, agent_id)

With WorkflowBuilder

python
from deepcrew import WorkflowBuilder, ObservabilityConfig

obs = ObservabilityConfig(otel_endpoint="http://localhost:4317", service_name="workflow-app")

workflow = (
    WorkflowBuilder(observability=obs)  # pass at construction time
    .add_agent("step1", agent1, task="{input}")
    .add_agent("step2", agent2, task="Refine:\n{step1}")
    .then("step1", "step2")
)

result = await workflow.run("My query")

# Spans emitted per step:
# deepcrew.workflow.step   → covers each DAG node
#   deepcrew.agent.run     → the agent within the step
#     deepcrew.llm.call
#     deepcrew.tool.call

Start Jaeger locally (Docker)

shell
docker run -d --name jaeger \
  -e COLLECTOR_OTLP_ENABLED=true \
  -p 6831:6831/udp \
  -p 16686:16686 \
  -p 4317:4317 \
  jaegertracing/all-in-one:latest

# Open http://localhost:16686 to view traces

ObservabilityConfig reference

otel_endpoint
str | None
OTLP collector endpoint. For gRPC: http://localhost:4317. For HTTP: http://localhost:4318/v1/traces.
service_name
str = "deepcrew"
Service name in traces. Use your app name for easy filtering in Jaeger/Grafana.
enabled
bool = True
Master switch. Set to False for no-op without removing the ObservabilityConfig object.
export_format
"grpc" | "http"
OTLP export protocol. "grpc" for port 4317, "http" for port 4318.

Span attributes

SpanAttributes
deepcrew.agent.runagent.id, llm.model
deepcrew.llm.callagent.id, llm.model, llm.input_tokens, llm.output_tokens
deepcrew.tool.callagent.id, tool.name
deepcrew.workflow.stepstep.name
Infrastructure

CLI — deepcrew run

Run declarative YAML workflow files from the terminal. No Python code required for simple workflows.

Installation check

shell
pip install deepcrew-ai
deepcrew --version
# deepcrew-ai 0.2.0

Write a workflow YAML

workflow.yaml
agents:
  - name: researcher
    model: openai/gpt-4o-mini
    system_prompt: Research the topic thoroughly using all available information.
    tools:
      - web_search   # built-in skill by name

  - name: analyst
    model: anthropic/claude-haiku-4-5-20251001
    system_prompt: Critically analyze the research findings. Identify gaps and strengths.

  - name: writer
    model: openai/gpt-4o
    system_prompt: Write a clear, well-structured executive summary report.
    tools:
      - summarize   # built-in summarize skill

workflow:
  - step: research
    agent: researcher
    task: "{input}"

  - step: analysis
    agent: analyst
    task: |
      Analyze this research:
      {research}
    depends_on:
      - research

  - step: report
    agent: writer
    task: |
      Write an executive summary based on:

      Research: {research}
      Analysis: {analysis}
    depends_on:
      - research
      - analysis

Run it

shell
# Stream output to terminal
deepcrew run workflow.yaml --input "The future of autonomous vehicles"

# Non-streaming (prints only final result)
deepcrew run workflow.yaml --input "Quantum computing in 2026" --no-stream

# List all agents in a config
deepcrew agents list --config workflow.yaml

YAML schema

agents[].name*
str
Agent identifier, referenced in workflow steps.
agents[].model*
str
LiteLLM model string.
agents[].system_prompt
str
Agent's system prompt.
agents[].tools
list[str]
Built-in skill names: web_search, summarize, execute_code.
agents[].max_turns
int = 10
Max inner loop turns for this agent.
agents[].temperature
float | null
Sampling temperature.
workflow[].step*
str
Step name — used as a variable {step_name} in subsequent task templates.
workflow[].agent*
str
References an agent by name.
workflow[].task
str
Task template. {input} is the CLI --input value. {step_name} is the text output of that step.
workflow[].depends_on
list[str]
Step names this step depends on. Steps without overlapping dependencies run in parallel.

Supported built-in tool names

YAML nameSkill classDescription
web_searchWebSearchSkillDuckDuckGo search
summarizeSummarizeSkillLLM-backed summarization
execute_codeCodeExecutionSkillPython subprocess execution
🔧
For advanced workflows with custom Python tools, memory providers, or observability, use the Python API directly. The CLI is designed for simple, shareable workflows that don't need custom code.