pip install deepcrew-ai

Multi-agent AI,
without the boilerplate.

Build parallel AI workflows using any LLM provider. Attach tools via MCP, stream events in real time, and let agents collaborate — automatically or through explicit DAG workflows.

quickstart.py
from deepcrew import Agent, WorkflowBuilder, tool

@tool
def search(query: str) -> str:
    "Search the web."
    return f"Results for: {query}"

researcher = Agent("researcher", model="openai/gpt-4o-mini",
                   system_prompt="Research thoroughly.", tools=[search])
writer     = Agent("writer",     model="openai/gpt-4o",
                   system_prompt="Write clearly.")

result = await (
    WorkflowBuilder()
    .add_agent("research", researcher, task="{input}")
    .add_agent("write",    writer,    task="Write about:\n{research}")
    .then("research", "write")
    .run("The future of AI agents")
)
print(result.final_output.text)
asyncio.gather · parallel execution
gpt-4o-mini
Orchestrator
routing request… ✓ dispatched
researcher
gpt-4o
✓ done
analyst
claude-haiku
✓ done
writer
gpt-4o
✓ done
Synthesizer
Final response ready
$ pip install deepcrew-ai
0+
LLM providers
0
MCP transports
0
Tests passing
0
Custom provider code

Parallel by default

Independent agents and tool calls always run concurrently via asyncio.gather — no extra configuration needed.

100+ LLM providers

LiteLLM handles OpenAI, Anthropic, Gemini, Bedrock, Azure, Ollama, and more — same API, zero provider-specific code.

MCP tool integration

Attach tools from any MCP server — subprocess stdio, legacy SSE, or modern streamable-HTTP. Or use plain Python functions.

SSE streaming

Every event — text deltas, tool calls, agent completions — streams in real time. Drop-in compatible with FastAPI StreamingResponse.

Workflow Builder

Declare agent dependencies as a DAG. Independent nodes at each level execute in parallel automatically via topological sort.

Automated routing

Let a router LLM decide which agents to run and in what configuration. Falls back to a synthesizer when multiple agents are used.

Getting started

Installation

Requires Python 3.11+. Install from PyPI:

shell
pip install deepcrew-ai

# with dev dependencies (pytest, respx)
pip install "deepcrew-ai[dev]"

Set your API key

deepcrew-ai uses LiteLLM under the hood, so you set provider credentials the same way you would for any LiteLLM project — environment variables:

shell
export OPENAI_API_KEY="sk-..."
export ANTHROPIC_API_KEY="sk-ant-..."
export GEMINI_API_KEY="..."

Your first agent

first_agent.py
import asyncio
from deepcrew import Agent, run_agent, tool

@tool
def get_weather(city: str) -> dict:
    "Get current weather for a city."
    return {"city": city, "temperature": 22, "condition": "sunny"}

async def main():
    agent = Agent(
        name="assistant",
        model="openai/gpt-4o-mini",
        system_prompt="You are a helpful weather assistant.",
        tools=[get_weather],
    )
    result = await run_agent(
        agent,
        [{"role": "user", "content": "What's the weather in Tokyo?"}],
    )
    print(result.text)
    print(f"Tokens: {result.input_tokens}in / {result.output_tokens}out")

asyncio.run(main())
Core

Agents

An Agent is a dataclass that bundles a model, system prompt, and the tools it can use. Agents are stateless — you pass conversation history into run_agent() each time.

python
from deepcrew import Agent
from deepcrew.mcp import HTTPMCP

mcp = HTTPMCP("https://my-mcp.example.com/mcp")

agent = Agent(
    name="researcher",
    model="anthropic/claude-opus-4-8",     # any litellm string
    system_prompt="You are a research specialist.",
    mcps=[mcp],                            # MCP tool servers
    tools=[my_fn, another_fn],            # Python @tool functions
    max_turns=10,                          # LLM→tool→LLM cycles
    temperature=0.7,
    max_tokens=4096,
    extra_params={                         # forwarded to litellm verbatim
        "thinking": {"type": "enabled", "budget_tokens": 5000},
    },
)

Agent parameters

name*
str
Unique identifier used in logs, events, and result objects.
model*
str
LiteLLM model string: "openai/gpt-4o", "anthropic/claude-opus-4-8", "gemini/gemini-2.0-flash", etc.
system_prompt
str
Injected as the first system message before conversation history.
mcps
list[MCPClient]
MCP clients whose tools this agent can call. Tools are discovered lazily on first use.
tools
list[Callable]
Python functions (decorated with @tool or plain). JSON Schema is auto-generated from type hints.
max_turns
int = 10
Max LLM→tool→LLM cycles. MaxTurnsError is raised if exceeded.
temperature
float | None
Sampling temperature. None uses the provider's default.
max_tokens
int | None
Max output tokens. None uses the provider's default.
extra_params
dict
Any extra kwargs forwarded verbatim to litellm.acompletion.

Model strings

python
# OpenAI
Agent("a", model="openai/gpt-4o")
Agent("a", model="openai/gpt-4o-mini")

# Anthropic
Agent("a", model="anthropic/claude-opus-4-8")
Agent("a", model="anthropic/claude-haiku-4-5-20251001")

# Google
Agent("a", model="gemini/gemini-2.0-flash")
Agent("a", model="gemini/gemini-2.5-pro")

# AWS Bedrock
Agent("a", model="bedrock/anthropic.claude-opus-4-8-20250514-v1:0")

# Azure OpenAI
Agent("a", model="azure/gpt-4o", extra_params={"api_base": "https://..."})

# Local via Ollama
Agent("a", model="ollama/llama3.2")
Core

Python Function Tools

Decorate any Python function with @tool to make it available as an agent tool. The JSON Schema is generated automatically from type hints.

python
from deepcrew import tool, Agent, run_agent

@tool
def search_web(query: str, max_results: int = 5) -> list[str]:
    """Search the web and return a list of URLs.

    Args:
        query (str): The search query.
        max_results (int): Maximum number of results to return.
    """
    # your implementation
    return ["https://example.com/..."]


@tool(name="db_lookup", description="Query the internal database")
def lookup_database(table: str, id: int) -> dict:
    return {"id": id, "data": "..."}


agent = Agent(
    name="assistant",
    model="openai/gpt-4o",
    tools=[search_web, lookup_database],
)

Supported types

Python typeJSON Schema
str{"type": "string"}
int{"type": "integer"}
float{"type": "number"}
bool{"type": "boolean"}
list[str]{"type": "array", "items": {"type": "string"}}
dict[str, int]{"type": "object", "additionalProperties": {"type": "integer"}}
Optional[str]{"type": ["string", "null"]}
Literal["a", "b"]{"enum": ["a", "b"]}
ℹ️
Google-style docstring parameter descriptions (Args: param (type): description) are automatically extracted and added to each parameter's JSON Schema.

Async tools

Async functions work natively — the runner awaits them automatically:

python
import httpx
from deepcrew import tool

@tool
async def fetch_url(url: str) -> str:
    "Fetch the content of a URL."
    async with httpx.AsyncClient() as client:
        r = await client.get(url)
        return r.text[:2000]
Core

Runner

run_agent() executes a single agent's agentic loop: call the LLM, buffer streamed tool calls, execute them in parallel, append results, and repeat.

python
from deepcrew import Agent, run_agent

agent = Agent("assistant", model="openai/gpt-4o")

result = await run_agent(
    agent,
    messages=[{"role": "user", "content": "Hello!"}],
    tool_defs=None,   # optional: pre-fetched ToolDef list
    queue=None,       # optional: asyncio.Queue for streaming events
    agent_id=None,    # optional: override agent_id in events
)

print(result.text)           # final text output
print(result.tool_calls)     # list of all tool calls made
print(result.input_tokens)   # LLM input token count
print(result.output_tokens)  # LLM output token count

How the loop works

  • Calls litellm.acompletion(..., stream=True) and buffers streamed tool call deltas by index
  • When the stream ends with tool calls buffered, executes all of them in parallel via asyncio.gather
  • Appends the assistant message (with tool_calls) and tool results to history, then loops
  • Exits when the model produces a response with no tool calls, or when max_turns is reached
  • Each event (text delta, tool call, tool result, etc.) is put into queue if provided
Core

Streaming

Every part of deepcrew-ai emits StreamEvent objects. You can consume them as an async generator or encode them as Server-Sent Events.

With FastAPI

app.py
from fastapi import FastAPI
from fastapi.responses import StreamingResponse
from deepcrew import Agent, Orchestrator

app = FastAPI()
orch = Orchestrator([
    Agent("assistant", model="openai/gpt-4o", system_prompt="Be helpful."),
])

@app.post("/chat")
async def chat(query: str):
    async def event_stream():
        async for event in orch.stream(query):
            yield event.to_sse()   # "event: text_delta\ndata: {...}\n\n"
    return StreamingResponse(event_stream(), media_type="text/event-stream")

With asyncio.Queue

python
import asyncio
from deepcrew import Agent, run_agent
from deepcrew.types import EventType

queue = asyncio.Queue()
agent = Agent("bot", model="openai/gpt-4o")

# Run agent in background, reading events in real time
task = asyncio.create_task(
    run_agent(agent, [{"role": "user", "content": "Hi"}], queue=queue)
)

while True:
    event = await queue.get()
    if event is None:
        break
    if event.event == EventType.TEXT_DELTA:
        print(event.data["chunk"], end="", flush=True)

await task
Orchestration

Workflow Builder

Build explicit agent pipelines as a directed acyclic graph. Nodes at the same dependency level run in parallel automatically.

python
from deepcrew import Agent, WorkflowBuilder

researcher = Agent("researcher", model="openai/gpt-4o-mini",
                   system_prompt="Research the topic.")
critic     = Agent("critic",     model="openai/gpt-4o-mini",
                   system_prompt="Find gaps and weaknesses.")
expander   = Agent("expander",   model="openai/gpt-4o-mini",
                   system_prompt="Add technical depth.")
writer     = Agent("writer",     model="openai/gpt-4o",
                   system_prompt="Write a polished report.")

workflow = (
    WorkflowBuilder()
    .add_agent("research", researcher, task="{input}")
    .add_agent("critique", critic,   task="Critique:\n{research}")
    .add_agent("expand",   expander, task="Expand:\n{research}")
    .add_agent("report",   writer,
               task="Write a report.\n\n"
                    "Research:\n{research}\n\n"
                    "Critique:\n{critique}\n\n"
                    "Details:\n{expand}")
    .then("research", "critique")   # critique waits for research
    .then("research", "expand")     # expand waits for research
    .then("critique", "report")     # report waits for both
    .then("expand", "report")
)

# research → [critique + expand in parallel] → report
result = await workflow.run("The future of renewable energy")
print(result.final_output.text)
print(f"Total tokens: {result.total_tokens}")

Task templates

The task argument is a Python format string where {input} is the initial query and {node_name} is the text output of any predecessor node. Or pass a callable:

python
# String template
.add_agent("writer", agent, task="Write about:\n{research}\n\nContext:\n{expand}")

# Callable — receives a dict with "input" and all predecessor outputs
.add_agent("writer", agent, task=lambda ctx: (
    f"Topic: {ctx['input']}\n\n"
    f"Research ({len(ctx['research'])} chars): {ctx['research'][:500]}..."
))

Streaming workflow

python
from deepcrew.types import EventType

async for event in workflow.stream("My topic"):
    if event.event == EventType.STEP_START:
        print(f"▶ {event.data['node']} starting...")
    elif event.event == EventType.TEXT_DELTA:
        print(event.data["chunk"], end="", flush=True)
    elif event.event == EventType.STEP_DONE:
        print(f"\n✓ {event.data['node']} done")
    elif event.event == EventType.DONE:
        print(f"\n\nFinal: {event.data['final_text']}")
Orchestration

Orchestrator

The automated mode: a router LLM analyzes your query and the available agents, decides whether to use one agent or fan out to multiple agents running in parallel, then a synthesizer merges the results.

python
from deepcrew import Agent, Orchestrator

agents = [
    Agent("researcher", model="openai/gpt-4o-mini",
          system_prompt="Gather comprehensive facts and background."),
    Agent("analyst",    model="openai/gpt-4o-mini",
          system_prompt="Evaluate trends, risks, and opportunities."),
    Agent("writer",     model="openai/gpt-4o-mini",
          system_prompt="Produce clear, engaging written content."),
]

orch = Orchestrator(
    agents=agents,
    router_model="openai/gpt-4o-mini",
    synthesizer_model="openai/gpt-4o",
    max_parallel_agents=5,
)

# Non-streaming
result = await orch.run("What are the key challenges of AI in healthcare?")
print(result.final_text)
print(result.total_tokens)

# Streaming
async for event in orch.stream("..."):
    print(event.to_sse(), end="")

How routing works

The router calls router_model with a JSON-mode prompt listing all agents. The model returns either:

json — single agent
{
  "route": "single",
  "agent": "researcher",
  "task": "Research quantum computing trends"
}
json — parallel
{
  "route": "parallel",
  "agents": [
    {"name": "researcher", "task": "Gather facts"},
    {"name": "analyst",    "task": "Evaluate trends"}
  ]
}

Custom prompts

python
orch = Orchestrator(
    agents=agents,
    router_model="openai/gpt-4o-mini",
    router_system_prompt="Your custom routing instructions...\n\nAgents:\n{agent_descriptions}",
    synthesizer_system_prompt="Merge these results into a single answer.",
)
MCP Tools

Model Context Protocol

deepcrew-ai implements all three MCP transport types. Attach any combination to an agent via mcps=[...]. Tools are discovered in parallel on first use.

📋
All three clients implement the same MCPClient abstract base and support the async context manager protocol: async with StdioMCP(...) as mcp:
TransportClassUse case
stdioStdioMCPSubprocess MCP servers (npx, python, etc.)
streamable-HTTPHTTPMCPRemote MCP servers — modern protocol (2024-11-05+)
SSESSEMCPLegacy /sse + /messages protocol servers
multiMCPManagerAggregate any mix of the above into one interface
MCP Tools

StdioMCP

Spawns a subprocess and communicates via JSON-RPC over stdin/stdout. The standard transport for MCP servers distributed as npm packages or Python scripts.

python
from deepcrew import Agent, run_agent
from deepcrew.mcp import StdioMCP

# Official filesystem MCP server (Node.js / npx)
async with StdioMCP(
    command="npx",
    args=["-y", "@modelcontextprotocol/server-filesystem", "."],
    env={"SOME_VAR": "value"},   # merged with current environment
) as mcp:
    tools = await mcp.list_tools()
    print([t.name for t in tools])

    agent = Agent(
        name="file_agent",
        model="openai/gpt-4o",
        system_prompt="Help users with file operations.",
        mcps=[mcp],
    )
    result = await run_agent(
        agent,
        [{"role": "user", "content": "List the files in the current directory."}],
    )
    print(result.text)

Constructor parameters

command*
str
Executable to run, e.g. "npx", "python", "uvx".
args
list[str]
Command-line arguments passed to the executable.
env
dict | None
Extra environment variables, merged with the current process environment.
MCP Tools

HTTPMCP

Modern streamable-HTTP transport (MCP spec 2024-11-05+). Manages a Mcp-Session-Id header, automatically re-initializes on expired sessions, and retries transient errors with exponential backoff.

python
from deepcrew.mcp import HTTPMCP

async with HTTPMCP(
    url="https://my-mcp.example.com/mcp",
    headers={"Authorization": "Bearer sk-..."},
    timeout=30,
    retries=2,
) as mcp:
    tools = await mcp.list_tools()
    result = await mcp.call_tool("search", {"query": "hello world"})
MCP Tools

SSEMCP

Legacy SSE transport. Connects to /sse to receive the endpoint event, then POSTs JSON-RPC requests to that endpoint.

python
from deepcrew.mcp import SSEMCP

async with SSEMCP(
    base_url="http://localhost:3000",
    headers={"X-API-Key": "secret"},
) as mcp:
    tools = await mcp.list_tools()
    result = await mcp.call_tool("my_tool", {"arg": "value"})
⚠️
Requires: httpx-sse is included in the default dependencies. If you see an import error, run pip install httpx-sse.
MCP Tools

MCPManager

Aggregates multiple MCP clients. Discovers tools from all servers in parallel and routes call_tool() to the right server automatically. Implements MCPClient so it can be passed directly to Agent.mcps.

python
from deepcrew import Agent, run_agent
from deepcrew.mcp import MCPManager, StdioMCP, HTTPMCP

async with MCPManager([
    StdioMCP("npx", ["-y", "@modelcontextprotocol/server-filesystem", "."]),
    HTTPMCP("https://search-mcp.example.com/mcp"),
    HTTPMCP("https://calendar-mcp.example.com/mcp"),
]) as manager:
    tools = await manager.discover_tools()
    print(f"{len(tools)} tools from 3 servers")

    # Pass the manager as a single MCP — it handles routing
    agent = Agent(
        name="super_agent",
        model="openai/gpt-4o",
        system_prompt="Use all available tools to answer questions.",
        mcps=[manager],
    )
    result = await run_agent(agent, [{"role": "user", "content": "..."}])
Reference

API Reference

All public names exported from import deepcrew.

run_agent()

signature
async def run_agent(
    agent: Agent,
    messages: list[dict[str, Any]],
    *,
    tool_defs: list[ToolDef] | None = None,
    queue: asyncio.Queue[StreamEvent | None] | None = None,
    agent_id: str | None = None,
) -> AgentResult

Orchestrator

python
class Orchestrator:
    def __init__(
        self,
        agents: list[Agent],
        router_model: str = "openai/gpt-4o-mini",
        synthesizer_model: str | None = None,
        router_system_prompt: str | None = None,
        synthesizer_system_prompt: str | None = None,
        max_parallel_agents: int = 5,
    ) -> None

    async def run(self, query: str, context: dict | None = None) -> OrchestratorResult
    def stream(self, query: str, context: dict | None = None) -> AsyncGenerator[StreamEvent, None]

WorkflowBuilder

python
class WorkflowBuilder:
    def add_agent(self, name: str, agent: Agent,
                  task: str | Callable = "{input}") -> WorkflowBuilder
    def then(self, predecessor: str, successor: str) -> WorkflowBuilder
    async def run(self, initial_input: str, context: dict | None = None) -> WorkflowResult
    def stream(self, initial_input: str, context: dict | None = None) -> AsyncGenerator[StreamEvent, None]

Result types

python
@dataclass class AgentResult:
    agent_id: str
    text: str
    tool_calls: list[dict]
    input_tokens: int
    output_tokens: int
    model: str
    total_tokens: int   # property

@dataclass class WorkflowResult:
    outputs: dict[str, AgentResult]   # node_name → result
    final_output: AgentResult | None
    total_input_tokens: int
    total_output_tokens: int
    total_tokens: int   # property

@dataclass class OrchestratorResult:
    final_text: str
    agent_results: list[AgentResult]
    router_result: AgentResult | None
    total_input_tokens: int
    total_output_tokens: int
    total_tokens: int   # property
Reference

Stream Events

Every event has event (EventType), agent_id (str), and data (dict). Call .to_sse() to encode as a Server-Sent Event string.

Eventdata keysEmitted by
agent_startmodelrun_agent at loop start
text_deltachunkEach streamed text token
thinking_deltachunkThinking/reasoning tokens (where supported)
tool_calltool, argsBefore tool execution
tool_resulttool, resultAfter tool execution
agent_doneinput_tokens, output_tokensAgent loop complete
step_startnodeWorkflowBuilder: node begins
step_donenodeWorkflowBuilder: node finished
errormessageAny exception during execution
donefinal_textEntire run complete
python
from deepcrew.types import EventType, StreamEvent

event = StreamEvent(EventType.TEXT_DELTA, {"chunk": "Hello"}, agent_id="bot")

# SSE string: "event: text_delta\ndata: {"agent_id": "bot", "chunk": "Hello"}\n\n"
print(event.to_sse())

# dict form
print(event.to_dict())
Examples

Full Examples

Multi-step research pipeline

research_pipeline.py
import asyncio
from deepcrew import Agent, WorkflowBuilder
from deepcrew.types import EventType

async def main():
    models = {
        "fast": "openai/gpt-4o-mini",
        "smart": "openai/gpt-4o",
    }

    workflow = (
        WorkflowBuilder()
        .add_agent("outline",  Agent("outline",  model=models["fast"],
                   system_prompt="Create a detailed outline."), task="{input}")
        .add_agent("section1", Agent("section1", model=models["fast"],
                   system_prompt="Write section 1 in depth."),
                   task="Topic: {input}\n\nOutline:\n{outline}\n\nWrite section 1.")
        .add_agent("section2", Agent("section2", model=models["fast"],
                   system_prompt="Write section 2 in depth."),
                   task="Topic: {input}\n\nOutline:\n{outline}\n\nWrite section 2.")
        .add_agent("editor",   Agent("editor",   model=models["smart"],
                   system_prompt="Edit and merge into a cohesive document."),
                   task="Merge and edit:\n\n{section1}\n\n---\n\n{section2}")
        .then("outline", "section1")
        .then("outline", "section2")
        .then("section1", "editor")
        .then("section2", "editor")
    )

    print("Running: outline → [section1 + section2 parallel] → editor\n")
    async for event in workflow.stream("The history of the internet"):
        if event.event == EventType.STEP_START:
            print(f"\n▶ {event.data['node']}")
        elif event.event == EventType.TEXT_DELTA:
            print(event.data["chunk"], end="", flush=True)

asyncio.run(main())

FastAPI + SSE streaming endpoint

server.py
from fastapi import FastAPI, Request
from fastapi.responses import StreamingResponse
from pydantic import BaseModel
from deepcrew import Agent, Orchestrator

app = FastAPI()

orch = Orchestrator([
    Agent("researcher", model="openai/gpt-4o-mini",
          system_prompt="Research any topic thoroughly."),
    Agent("writer",     model="openai/gpt-4o",
          system_prompt="Write clear, engaging content."),
], router_model="openai/gpt-4o-mini", synthesizer_model="openai/gpt-4o")

class ChatRequest(BaseModel):
    message: str

@app.post("/chat/stream")
async def chat_stream(req: ChatRequest):
    async def generate():
        async for event in orch.stream(req.message):
            yield event.to_sse()
        yield "event: done\ndata: {}\n\n"
    return StreamingResponse(generate(), media_type="text/event-stream",
                             headers={"Cache-Control": "no-cache",
                                      "X-Accel-Buffering": "no"})

MCP filesystem agent

fs_agent.py
import asyncio
from deepcrew import Agent, run_agent
from deepcrew.mcp import StdioMCP

async def main():
    async with StdioMCP("npx", ["-y", "@modelcontextprotocol/server-filesystem", "."]) as mcp:
        agent = Agent(
            name="fs",
            model="openai/gpt-4o",
            system_prompt="Help users manage their files. Always confirm before deleting.",
            mcps=[mcp],
        )
        result = await run_agent(agent, [
            {"role": "user", "content": "Show me all Python files and their sizes."},
        ])
        print(result.text)

asyncio.run(main())