pip install deepcrew-ai

Multi-agent AI,
without the boilerplate.

Build parallel AI workflows with any LLM provider. APEX synthesis, intelligent agent spawning, iterative looping, skills, memory, and OpenTelemetry observability — all async-first.

quickstart.py
from deepcrew import Agent, Orchestrator, ApexConfig

researcher = Agent("researcher", model="openai/gpt-4o-mini",
                   system_prompt="Research thoroughly.")
analyst    = Agent("analyst",    model="anthropic/claude-haiku-4-5-20251001",
                   system_prompt="Analyze critically.")
writer     = Agent("writer",     model="gemini/gemini-2.0-flash",
                   system_prompt="Write clearly.")

orch = Orchestrator(
    agents=[researcher, analyst, writer],
    router_model="openai/gpt-4o-mini",
    apex_model="openai/gpt-4o",
    apex_config=ApexConfig(cite_sources=True),
)

result = await orch.run("Future of renewable energy")
print(result.final_text)
print(f"Confidence: {result.agent_results[-1].confidence:.2f}")
asyncio.gather · parallel execution
gpt-4o-mini
Orchestrator
routing request… ✓ dispatched
researcher
gpt-4o-mini
✓ done
analyst
claude-haiku
✓ done
writer
gemini-flash
✓ done
APEX
confidence: 0.94 · ready
$ pip install deepcrew-ai
0+
LLM providers
0
v0.2.0 features
0
Tests passing
0
MCP transports

Parallel by default

Independent agents and tool calls always run concurrently via asyncio.gather — no extra configuration needed.

100+ LLM providers

LiteLLM handles OpenAI, Anthropic, Gemini, Bedrock, Azure, Ollama — same API, zero provider-specific code.

APEX synthesis

Confidence-scored, citation-aware multi-agent synthesis engine. Know how sure your AI is about its answer.

Agent spawning

Any agent can spawn sub-agents mid-loop with intelligent tool allocation. Claude Code-style dynamic orchestration.

Skills + MCP tools

Reusable capability bundles via @skill, built-ins like WebSearchSkill, and MCP stdio/HTTP/SSE transports.

Observability

OpenTelemetry spans for every LLM call, tool execution, and workflow step. Zero overhead when disabled.

Memory providers

Pluggable short-term and persistent context stores. Memories auto-inject into agent context before LLM calls.

Retry & fallback

Per-agent exponential backoff with configurable model fallback chains. Never fail on transient errors.

Getting started

Installation

Requires Python 3.11+. Install from PyPI:

shell
# Core library
pip install deepcrew-ai

# With OpenTelemetry support
pip install "deepcrew-ai[otel]"

# With dev dependencies (pytest, respx)
pip install "deepcrew-ai[dev]"

Set your API keys

shell
export OPENAI_API_KEY="sk-..."
export ANTHROPIC_API_KEY="sk-ant-..."
export GEMINI_API_KEY="AIza..."
export AWS_ACCESS_KEY_ID="..."      # for Bedrock
export AZURE_API_KEY="..."          # for Azure OpenAI

Your first agent

first_agent.py
import asyncio
from deepcrew import Agent, run_agent, tool

@tool
def get_weather(city: str) -> dict:
    "Get current weather for a city."
    return {"city": city, "temperature": 22, "condition": "sunny"}

async def main():
    agent = Agent(
        name="assistant",
        model="openai/gpt-4o-mini",
        system_prompt="You are a helpful weather assistant.",
        tools=[get_weather],
    )
    result = await run_agent(
        agent,
        [{"role": "user", "content": "What's the weather in Tokyo?"}],
    )
    print(result.text)
    print(f"Tokens: {result.input_tokens}in / {result.output_tokens}out")

asyncio.run(main())
🚀
New to v0.2.0? Check the Features guide for APEX synthesis, looping, skills, memory providers, retry policies, observability, and the CLI.
Core

Agents

An Agent is a dataclass that bundles a model, system prompt, and tools. Agents are stateless — pass conversation history into run_agent() each call.

python
from deepcrew import Agent, RetryPolicy, FallbackChain, LoopConfig, InMemoryProvider
from deepcrew import WebSearchSkill
from deepcrew.mcp import HTTPMCP

agent = Agent(
    name="researcher",
    model="anthropic/claude-opus-4-8",
    system_prompt="You are a research specialist.",
    mcps=[HTTPMCP("https://my-mcp.example.com/mcp")],
    tools=[my_fn],
    skills=[WebSearchSkill()],              # v0.2.0: reusable skill bundles
    memory=InMemoryProvider(),              # v0.2.0: context persistence
    retry_policy=RetryPolicy(max_retries=3),# v0.2.0: retry on failure
    fallback_chain=FallbackChain([          # v0.2.0: model fallbacks
        "openai/gpt-4o",
        "gemini/gemini-2.0-flash",
    ]),
    loop_config=LoopConfig(max_iterations=3),# v0.2.0: outer refinement loop
    max_turns=10,
    temperature=0.7,
    max_tokens=4096,
)

All Agent parameters

name*
str
Unique identifier used in logs, events, and result objects.
model*
str
LiteLLM model string: "openai/gpt-4o", "anthropic/claude-opus-4-8", "gemini/gemini-2.0-flash", etc.
system_prompt
str
Injected as the first system message before conversation history.
mcps
list[MCPClient]
MCP clients whose tools this agent can call. Discovered lazily in parallel.
tools
list[Callable]
Python functions decorated with @tool. JSON Schema auto-generated from type hints.
skills v0.2
list[Skill]
Higher-level capability bundles. Exposed to LLM identically to tools.
memory v0.2
MemoryProvider | None
Pluggable context store. Relevant memories are injected before each LLM call.
retry_policy v0.2
RetryPolicy | None
Retry config: max attempts, backoff, which exceptions to retry on.
fallback_chain v0.2
FallbackChain | None
Ordered list of fallback model strings to try when all retries fail.
loop_config v0.2
LoopConfig | None
Outer refinement loop: run agent → check convergence → re-prompt → repeat.
max_turns
int = 10
Max LLM→tool→LLM cycles per run. MaxTurnsError raised if exceeded.
temperature
float | None
Sampling temperature. None uses the provider default.
max_tokens
int | None
Max output tokens. None uses the provider default.
extra_params
dict
Any extra kwargs forwarded verbatim to litellm.acompletion.
Core

Python Function Tools

Decorate any Python function with @tool. JSON Schema is generated automatically from type hints.

python
from deepcrew import tool, Agent

@tool
def search_web(query: str, max_results: int = 5) -> list[str]:
    """Search the web and return URLs.

    Args:
        query (str): The search query.
        max_results (int): Maximum results to return.
    """
    return ["https://example.com/..."]

@tool(name="db_lookup", description="Query the internal database")
def lookup_database(table: str, id: int) -> dict:
    return {"id": id, "data": "..."}

@tool
async def fetch_url(url: str) -> str:
    "Fetch the HTML content of a URL."
    import httpx
    async with httpx.AsyncClient() as client:
        r = await client.get(url, timeout=10.0)
        return r.text[:3000]

agent = Agent("assistant", model="openai/gpt-4o",
              tools=[search_web, lookup_database, fetch_url])

Supported parameter types

Python typeJSON Schema type
str"string"
int"integer"
float"number"
bool"boolean"
list[str]{"type":"array","items":{"type":"string"}}
dict[str, int]{"type":"object","additionalProperties":{"type":"integer"}}
Optional[str]{"type":["string","null"]}
Literal["a","b"]{"enum":["a","b"]}
Core

Runner

run_agent() executes a single agent's agentic loop: call LLM → buffer tool calls → execute in parallel → append results → repeat.

python
from deepcrew import Agent, run_agent, ObservabilityConfig

agent = Agent("assistant", model="openai/gpt-4o")
obs   = ObservabilityConfig(otel_endpoint="http://localhost:4317")

result = await run_agent(
    agent,
    messages=[{"role": "user", "content": "Hello!"}],
    tool_defs=None,        # optional: pre-fetched ToolDef list
    queue=None,            # optional: asyncio.Queue for streaming
    agent_id=None,         # optional: override agent_id in events
    observability=obs,     # v0.2.0: optional OTel config
)

print(result.text)
print(result.confidence)      # v0.2.0: populated by APEX (None when direct)
print(result.loop_iterations) # v0.2.0: outer loop count

Loop behaviour

  • Streams via litellm.acompletion(stream=True) and buffers tool call deltas by index
  • Executes all tool calls in a single turn in parallel via asyncio.gather
  • Injects relevant memories before the first LLM call (if agent.memory is set)
  • Wraps the LLM call with retry/fallback logic (if agent.retry_policy or agent.fallback_chain is set)
  • Delegates to run_agent_loop() transparently when agent.loop_config is set
  • Exits when the model produces no tool calls, or when max_turns is reached (MaxTurnsError)
Core

Streaming

Every part of deepcrew-ai emits StreamEvent objects. Consume as an async generator or encode as Server-Sent Events for web clients.

With FastAPI

app.py
from fastapi import FastAPI
from fastapi.responses import StreamingResponse
from deepcrew import Agent, Orchestrator, ApexConfig

app = FastAPI()
orch = Orchestrator(
    agents=[Agent("assistant", model="openai/gpt-4o", system_prompt="Be helpful.")],
    apex_config=ApexConfig(cite_sources=True),
)

@app.post("/chat")
async def chat(query: str):
    async def event_stream():
        async for event in orch.stream(query):
            yield event.to_sse()   # "event: text_delta\ndata: {...}\n\n"
    return StreamingResponse(
        event_stream(),
        media_type="text/event-stream",
        headers={"Cache-Control": "no-cache", "X-Accel-Buffering": "no"},
    )

With asyncio.Queue

python
import asyncio
from deepcrew import Agent, run_agent
from deepcrew.types import EventType

queue = asyncio.Queue()
agent = Agent("bot", model="openai/gpt-4o")

task = asyncio.create_task(
    run_agent(agent, [{"role": "user", "content": "Hi"}], queue=queue)
)

while True:
    event = await queue.get()
    if event is None:
        break
    if event.event == EventType.TEXT_DELTA:
        print(event.data["chunk"], end="", flush=True)
    elif event.event == EventType.APEX_DONE:
        print(f"\nConfidence: {event.data['confidence']:.2f}")

await task
Orchestration

Workflow Builder

Build explicit agent pipelines as a directed acyclic graph. Independent nodes at each level run in parallel automatically.

python
from deepcrew import Agent, WorkflowBuilder, ObservabilityConfig

obs = ObservabilityConfig(otel_endpoint="http://localhost:4317")

workflow = (
    WorkflowBuilder(observability=obs)   # v0.2.0: OTel tracing per step
    .add_agent("research", Agent("researcher", model="openai/gpt-4o-mini",
               system_prompt="Research the topic."), task="{input}")
    .add_agent("critique", Agent("critic", model="anthropic/claude-haiku-4-5-20251001",
               system_prompt="Find gaps."), task="Critique:\n{research}")
    .add_agent("expand",   Agent("expander", model="gemini/gemini-2.0-flash",
               system_prompt="Add technical depth."), task="Expand:\n{research}")
    .add_agent("report",   Agent("writer", model="openai/gpt-4o",
               system_prompt="Write a polished report."),
               task="Write a report.\n\nResearch:\n{research}\n\nCritique:\n{critique}\n\nDetails:\n{expand}")
    .then("research", "critique")
    .then("research", "expand")
    .then("critique", "report")
    .then("expand",   "report")
)

# research → [critique + expand in parallel] → report
result = await workflow.run("Future of renewable energy")
print(result.final_output.text)
print(f"Total tokens: {result.total_tokens}")

Streaming workflow events

python
from deepcrew.types import EventType

async for event in workflow.stream("My topic"):
    if event.event == EventType.STEP_START:
        print(f"▶ {event.data['node']} starting...")
    elif event.event == EventType.TEXT_DELTA:
        print(event.data["chunk"], end="", flush=True)
    elif event.event == EventType.STEP_DONE:
        print(f"\n✓ {event.data['node']} done")
    elif event.event == EventType.DONE:
        print(f"\n\nFinal answer ready")
Orchestration

Orchestrator

Automated mode: a router LLM decides which agents to run, they execute in parallel, and APEX synthesizes the results with confidence scoring.

python
from deepcrew import Agent, Orchestrator, ApexConfig, tool

@tool
def search_web(query: str) -> str: ...
@tool
def run_code(code: str) -> str: ...

orch = Orchestrator(
    agents=[
        Agent("researcher", model="openai/gpt-4o-mini",
              system_prompt="Gather comprehensive facts."),
        Agent("analyst",    model="anthropic/claude-haiku-4-5-20251001",
              system_prompt="Evaluate trends and risks."),
        Agent("coder",      model="gemini/gemini-2.0-flash",
              system_prompt="Write and test code examples."),
    ],
    router_model="openai/gpt-4o-mini",
    apex_model="openai/gpt-4o",
    apex_config=ApexConfig(
        cite_sources=True,        # adds [source: agent_name] inline
        confidence_threshold=0.8,
        allow_tools=False,
    ),
    global_tools=[search_web, run_code],  # v0.2.0: router allocates per agent
    enable_spawn=True,                     # v0.2.0: agents can spawn sub-agents
    max_parallel_agents=5,
)

result = await orch.run("What are the key challenges of AI in healthcare?")
print(result.final_text)

# Streaming
async for event in orch.stream("..."):
    print(event.to_sse(), end="")
💡
See the full v0.2.0 Features page for detailed docs on APEX, agent spawning, and intelligent tool allocation.
MCP Tools

Model Context Protocol

All three MCP transport types. Attach any combination to an agent via mcps=[...]. Tools are discovered in parallel on first use.

TransportClassUse case
stdioStdioMCPSubprocess MCP servers (npx, python, uvx)
streamable-HTTPHTTPMCPRemote MCP servers — modern protocol
SSESSEMCPLegacy /sse + /messages protocol
multiMCPManagerAggregate any mix into one interface
MCP Tools

StdioMCP

python
from deepcrew import Agent, run_agent
from deepcrew.mcp import StdioMCP

async with StdioMCP(
    command="npx",
    args=["-y", "@modelcontextprotocol/server-filesystem", "."],
    env={"SOME_VAR": "value"},
) as mcp:
    agent = Agent("file_agent", model="openai/gpt-4o",
                  system_prompt="Help with file operations.", mcps=[mcp])
    result = await run_agent(agent, [{"role": "user", "content": "List Python files."}])
MCP Tools

HTTPMCP

python
from deepcrew.mcp import HTTPMCP

async with HTTPMCP(
    url="https://my-mcp.example.com/mcp",
    headers={"Authorization": "Bearer sk-..."},
    timeout=30,
    retries=2,
) as mcp:
    tools = await mcp.list_tools()
    result = await mcp.call_tool("search", {"query": "hello world"})
MCP Tools

SSEMCP

python
from deepcrew.mcp import SSEMCP

async with SSEMCP("http://localhost:3000", headers={"X-API-Key": "secret"}) as mcp:
    tools = await mcp.list_tools()
    result = await mcp.call_tool("my_tool", {"arg": "value"})
MCP Tools

MCPManager

python
from deepcrew import Agent, run_agent
from deepcrew.mcp import MCPManager, StdioMCP, HTTPMCP

async with MCPManager([
    StdioMCP("npx", ["-y", "@modelcontextprotocol/server-filesystem", "."]),
    HTTPMCP("https://search-mcp.example.com/mcp"),
    HTTPMCP("https://calendar-mcp.example.com/mcp"),
]) as manager:
    agent = Agent("super_agent", model="openai/gpt-4o",
                  system_prompt="Use all available tools.", mcps=[manager])
    result = await run_agent(agent, [{"role": "user", "content": "..."}])
Reference

API Reference

All public names exported from import deepcrew. New in v0.2.0 marked with v0.2.

run_agent()

signature
async def run_agent(
    agent: Agent,
    messages: list[dict[str, Any]],
    *,
    tool_defs: list[ToolDef] | None = None,
    queue: asyncio.Queue[StreamEvent | None] | None = None,
    agent_id: str | None = None,
    observability: ObservabilityConfig | None = None,  # v0.2.0
) -> AgentResult

Orchestrator

python
class Orchestrator:
    def __init__(
        self,
        agents: list[Agent],
        router_model: str = "openai/gpt-4o-mini",
        synthesizer_model: str | None = None,    # backward compat alias
        apex_model: str | None = None,            # v0.2.0
        apex_config: ApexConfig | None = None,    # v0.2.0
        router_system_prompt: str | None = None,
        synthesizer_system_prompt: str | None = None,
        max_parallel_agents: int = 5,
        global_tools: list[ToolDef] | None = None, # v0.2.0
        enable_spawn: bool = False,                # v0.2.0
    ) -> None

    async def run(self, query: str, context: dict | None = None) -> OrchestratorResult
    def stream(self, query: str, context: dict | None = None) -> AsyncGenerator[StreamEvent, None]

WorkflowBuilder

python
class WorkflowBuilder:
    def __init__(self, observability: ObservabilityConfig | None = None)  # v0.2.0
    def add_agent(self, name: str, agent: Agent, task: str | Callable = "{input}") -> WorkflowBuilder
    def then(self, predecessor: str, successor: str) -> WorkflowBuilder
    async def run(self, initial_input: str, context: dict | None = None) -> WorkflowResult
    def stream(self, initial_input: str, context: dict | None = None) -> AsyncGenerator[StreamEvent, None]

v0.2.0 classes

python
# APEX
class APEXSynthesizer:
    def __init__(self, model: str, config: ApexConfig | None = None)
    async def synthesize(self, query: str, results: list[AgentResult], ...) -> AgentResult

@dataclass class ApexConfig:
    confidence_threshold: float = 0.7
    cite_sources: bool = True
    allow_tools: bool = False
    system_prompt: str | None = None

# Loop
@dataclass class LoopConfig:
    max_iterations: int = 5
    convergence_fn: Callable[[AgentResult], bool] | None = None
    stop_condition: Callable[[AgentResult], bool] | None = None
    refine_prompt: str = "..."

async def run_agent_loop(agent, messages, *, tool_defs=None, queue=None, agent_id=None) -> AgentResult
async def search_loop(query, search_tool, *, agent, max_iterations=3, confidence_threshold=0.8, queue=None) -> AgentResult

# Memory
class MemoryProvider(ABC):  # store / retrieve / search / clear
class InMemoryProvider(MemoryProvider)
class FileMemoryProvider(MemoryProvider):
    def __init__(self, path: str | Path)

# Retry
@dataclass class RetryPolicy:
    max_retries: int = 3
    backoff_seconds: float = 1.0
    retry_on: tuple[type[Exception], ...] = (Exception,)
    exponential: bool = True

@dataclass class FallbackChain:
    models: list[str]

# Skills
class Skill(ABC):  # name / description / parameters / execute(**kwargs) -> str / to_tool_def()
def skill(fn=None, *, name=None, description=None)  # decorator
class SkillRegistry:  # register / get / list_all / clear
class WebSearchSkill(Skill)
class SummarizeSkill(Skill):
    def __init__(self, model: str = "openai/gpt-4o-mini")
class CodeExecutionSkill(Skill):
    def __init__(self, timeout: float = 10.0)

# Spawning
@dataclass class SpawnRequest:
    task: str
    tools: list[str] = []
    model: str | None = None
    system_prompt: str | None = None
    max_turns: int = 5

class ToolAllocator:
    def __init__(self, router_model: str)
    async def allocate(self, task: str, all_tools: list[ToolDef], max_tools: int = 10) -> list[ToolDef]

async def spawn_agent(request, all_tool_defs, parent_queue=None, router_model=..., parent_agent_id=...) -> AgentResult
def make_spawn_tool(all_tool_defs, parent_queue, router_model, parent_agent_id) -> ToolDef

# Observability
@dataclass class ObservabilityConfig:
    otel_endpoint: str | None = None
    service_name: str = "deepcrew"
    enabled: bool = True
    export_format: Literal["grpc","http"] = "grpc"

Result types

python
@dataclass class AgentResult:
    agent_id: str
    text: str
    tool_calls: list[dict]
    input_tokens: int
    output_tokens: int
    model: str
    confidence: float | None    # v0.2.0 — populated by APEX
    loop_iterations: int        # v0.2.0 — outer loop count
    total_tokens: int           # property

@dataclass class WorkflowResult:
    outputs: dict[str, AgentResult]
    final_output: AgentResult | None
    total_input_tokens: int
    total_output_tokens: int
    total_tokens: int  # property

@dataclass class OrchestratorResult:
    final_text: str
    agent_results: list[AgentResult]
    router_result: AgentResult | None
    total_input_tokens: int
    total_output_tokens: int
    total_tokens: int  # property
Reference

Stream Events

Every event has event (EventType), agent_id (str), and data (dict). Call .to_sse() to encode as a Server-Sent Event string.

Eventdata keysEmitted by
agent_startmodelrun_agent at loop start
text_deltachunkEach streamed text token
thinking_deltachunkThinking/reasoning tokens (where supported)
tool_calltool, argsBefore tool execution
tool_resulttool, resultAfter tool execution
agent_doneinput_tokens, output_tokensAgent loop complete
apex_startagentsv0.2 APEX synthesis begins
apex_doneconfidencev0.2 APEX synthesis complete
loop_iterationiteration, convergedv0.2 Each outer loop iteration
spawn_agenttask, requested_toolsv0.2 Sub-agent spawned
memory_retrievecountv0.2 Memories injected to context
memory_storekeyv0.2 Tool result stored to memory
retry_attemptattempt, model, delayv0.2 Before a retry
fallback_triggeredfrom_model, to_modelv0.2 Model switch
step_startnodeWorkflowBuilder: node begins
step_donenodeWorkflowBuilder: node finished
errormessageAny exception
donefinal_textEntire run complete