Multi-agent AI,
without the boilerplate.
Build parallel AI workflows using any LLM provider. Attach tools via MCP, stream events in real time, and let agents collaborate — automatically or through explicit DAG workflows.
from deepcrew import Agent, WorkflowBuilder, tool
@tool
def search(query: str) -> str:
"Search the web."
return f"Results for: {query}"
researcher = Agent("researcher", model="openai/gpt-4o-mini",
system_prompt="Research thoroughly.", tools=[search])
writer = Agent("writer", model="openai/gpt-4o",
system_prompt="Write clearly.")
result = await (
WorkflowBuilder()
.add_agent("research", researcher, task="{input}")
.add_agent("write", writer, task="Write about:\n{research}")
.then("research", "write")
.run("The future of AI agents")
)
print(result.final_output.text)
Parallel by default
Independent agents and tool calls always run concurrently via asyncio.gather — no extra configuration needed.
100+ LLM providers
LiteLLM handles OpenAI, Anthropic, Gemini, Bedrock, Azure, Ollama, and more — same API, zero provider-specific code.
MCP tool integration
Attach tools from any MCP server — subprocess stdio, legacy SSE, or modern streamable-HTTP. Or use plain Python functions.
SSE streaming
Every event — text deltas, tool calls, agent completions — streams in real time. Drop-in compatible with FastAPI StreamingResponse.
Workflow Builder
Declare agent dependencies as a DAG. Independent nodes at each level execute in parallel automatically via topological sort.
Automated routing
Let a router LLM decide which agents to run and in what configuration. Falls back to a synthesizer when multiple agents are used.
Installation
Requires Python 3.11+. Install from PyPI:
pip install deepcrew-ai
# with dev dependencies (pytest, respx)
pip install "deepcrew-ai[dev]"
Set your API key
deepcrew-ai uses LiteLLM under the hood, so you set provider credentials the same way you would for any LiteLLM project — environment variables:
export OPENAI_API_KEY="sk-..."
export ANTHROPIC_API_KEY="sk-ant-..."
export GEMINI_API_KEY="..."
Your first agent
import asyncio
from deepcrew import Agent, run_agent, tool
@tool
def get_weather(city: str) -> dict:
"Get current weather for a city."
return {"city": city, "temperature": 22, "condition": "sunny"}
async def main():
agent = Agent(
name="assistant",
model="openai/gpt-4o-mini",
system_prompt="You are a helpful weather assistant.",
tools=[get_weather],
)
result = await run_agent(
agent,
[{"role": "user", "content": "What's the weather in Tokyo?"}],
)
print(result.text)
print(f"Tokens: {result.input_tokens}in / {result.output_tokens}out")
asyncio.run(main())
Agents
An Agent is a dataclass that bundles a model, system prompt, and the tools it can use. Agents are stateless — you pass conversation history into run_agent() each time.
from deepcrew import Agent
from deepcrew.mcp import HTTPMCP
mcp = HTTPMCP("https://my-mcp.example.com/mcp")
agent = Agent(
name="researcher",
model="anthropic/claude-opus-4-8", # any litellm string
system_prompt="You are a research specialist.",
mcps=[mcp], # MCP tool servers
tools=[my_fn, another_fn], # Python @tool functions
max_turns=10, # LLM→tool→LLM cycles
temperature=0.7,
max_tokens=4096,
extra_params={ # forwarded to litellm verbatim
"thinking": {"type": "enabled", "budget_tokens": 5000},
},
)
Agent parameters
"openai/gpt-4o", "anthropic/claude-opus-4-8", "gemini/gemini-2.0-flash", etc.@tool or plain). JSON Schema is auto-generated from type hints.MaxTurnsError is raised if exceeded.None uses the provider's default.None uses the provider's default.litellm.acompletion.Model strings
# OpenAI
Agent("a", model="openai/gpt-4o")
Agent("a", model="openai/gpt-4o-mini")
# Anthropic
Agent("a", model="anthropic/claude-opus-4-8")
Agent("a", model="anthropic/claude-haiku-4-5-20251001")
# Google
Agent("a", model="gemini/gemini-2.0-flash")
Agent("a", model="gemini/gemini-2.5-pro")
# AWS Bedrock
Agent("a", model="bedrock/anthropic.claude-opus-4-8-20250514-v1:0")
# Azure OpenAI
Agent("a", model="azure/gpt-4o", extra_params={"api_base": "https://..."})
# Local via Ollama
Agent("a", model="ollama/llama3.2")
Python Function Tools
Decorate any Python function with @tool to make it available as an agent tool. The JSON Schema is generated automatically from type hints.
from deepcrew import tool, Agent, run_agent
@tool
def search_web(query: str, max_results: int = 5) -> list[str]:
"""Search the web and return a list of URLs.
Args:
query (str): The search query.
max_results (int): Maximum number of results to return.
"""
# your implementation
return ["https://example.com/..."]
@tool(name="db_lookup", description="Query the internal database")
def lookup_database(table: str, id: int) -> dict:
return {"id": id, "data": "..."}
agent = Agent(
name="assistant",
model="openai/gpt-4o",
tools=[search_web, lookup_database],
)
Supported types
| Python type | JSON Schema |
|---|---|
| str | {"type": "string"} |
| int | {"type": "integer"} |
| float | {"type": "number"} |
| bool | {"type": "boolean"} |
| list[str] | {"type": "array", "items": {"type": "string"}} |
| dict[str, int] | {"type": "object", "additionalProperties": {"type": "integer"}} |
| Optional[str] | {"type": ["string", "null"]} |
| Literal["a", "b"] | {"enum": ["a", "b"]} |
Args: param (type): description) are automatically extracted and added to each parameter's JSON Schema.Async tools
Async functions work natively — the runner awaits them automatically:
import httpx
from deepcrew import tool
@tool
async def fetch_url(url: str) -> str:
"Fetch the content of a URL."
async with httpx.AsyncClient() as client:
r = await client.get(url)
return r.text[:2000]
Runner
run_agent() executes a single agent's agentic loop: call the LLM, buffer streamed tool calls, execute them in parallel, append results, and repeat.
from deepcrew import Agent, run_agent
agent = Agent("assistant", model="openai/gpt-4o")
result = await run_agent(
agent,
messages=[{"role": "user", "content": "Hello!"}],
tool_defs=None, # optional: pre-fetched ToolDef list
queue=None, # optional: asyncio.Queue for streaming events
agent_id=None, # optional: override agent_id in events
)
print(result.text) # final text output
print(result.tool_calls) # list of all tool calls made
print(result.input_tokens) # LLM input token count
print(result.output_tokens) # LLM output token count
How the loop works
- Calls
litellm.acompletion(..., stream=True)and buffers streamed tool call deltas by index - When the stream ends with tool calls buffered, executes all of them in parallel via
asyncio.gather - Appends the assistant message (with tool_calls) and tool results to history, then loops
- Exits when the model produces a response with no tool calls, or when
max_turnsis reached - Each event (text delta, tool call, tool result, etc.) is put into
queueif provided
Streaming
Every part of deepcrew-ai emits StreamEvent objects. You can consume them as an async generator or encode them as Server-Sent Events.
With FastAPI
from fastapi import FastAPI
from fastapi.responses import StreamingResponse
from deepcrew import Agent, Orchestrator
app = FastAPI()
orch = Orchestrator([
Agent("assistant", model="openai/gpt-4o", system_prompt="Be helpful."),
])
@app.post("/chat")
async def chat(query: str):
async def event_stream():
async for event in orch.stream(query):
yield event.to_sse() # "event: text_delta\ndata: {...}\n\n"
return StreamingResponse(event_stream(), media_type="text/event-stream")
With asyncio.Queue
import asyncio
from deepcrew import Agent, run_agent
from deepcrew.types import EventType
queue = asyncio.Queue()
agent = Agent("bot", model="openai/gpt-4o")
# Run agent in background, reading events in real time
task = asyncio.create_task(
run_agent(agent, [{"role": "user", "content": "Hi"}], queue=queue)
)
while True:
event = await queue.get()
if event is None:
break
if event.event == EventType.TEXT_DELTA:
print(event.data["chunk"], end="", flush=True)
await task
Workflow Builder
Build explicit agent pipelines as a directed acyclic graph. Nodes at the same dependency level run in parallel automatically.
from deepcrew import Agent, WorkflowBuilder
researcher = Agent("researcher", model="openai/gpt-4o-mini",
system_prompt="Research the topic.")
critic = Agent("critic", model="openai/gpt-4o-mini",
system_prompt="Find gaps and weaknesses.")
expander = Agent("expander", model="openai/gpt-4o-mini",
system_prompt="Add technical depth.")
writer = Agent("writer", model="openai/gpt-4o",
system_prompt="Write a polished report.")
workflow = (
WorkflowBuilder()
.add_agent("research", researcher, task="{input}")
.add_agent("critique", critic, task="Critique:\n{research}")
.add_agent("expand", expander, task="Expand:\n{research}")
.add_agent("report", writer,
task="Write a report.\n\n"
"Research:\n{research}\n\n"
"Critique:\n{critique}\n\n"
"Details:\n{expand}")
.then("research", "critique") # critique waits for research
.then("research", "expand") # expand waits for research
.then("critique", "report") # report waits for both
.then("expand", "report")
)
# research → [critique + expand in parallel] → report
result = await workflow.run("The future of renewable energy")
print(result.final_output.text)
print(f"Total tokens: {result.total_tokens}")
Task templates
The task argument is a Python format string where {input} is the initial query and {node_name} is the text output of any predecessor node. Or pass a callable:
# String template
.add_agent("writer", agent, task="Write about:\n{research}\n\nContext:\n{expand}")
# Callable — receives a dict with "input" and all predecessor outputs
.add_agent("writer", agent, task=lambda ctx: (
f"Topic: {ctx['input']}\n\n"
f"Research ({len(ctx['research'])} chars): {ctx['research'][:500]}..."
))
Streaming workflow
from deepcrew.types import EventType
async for event in workflow.stream("My topic"):
if event.event == EventType.STEP_START:
print(f"▶ {event.data['node']} starting...")
elif event.event == EventType.TEXT_DELTA:
print(event.data["chunk"], end="", flush=True)
elif event.event == EventType.STEP_DONE:
print(f"\n✓ {event.data['node']} done")
elif event.event == EventType.DONE:
print(f"\n\nFinal: {event.data['final_text']}")
Orchestrator
The automated mode: a router LLM analyzes your query and the available agents, decides whether to use one agent or fan out to multiple agents running in parallel, then a synthesizer merges the results.
from deepcrew import Agent, Orchestrator
agents = [
Agent("researcher", model="openai/gpt-4o-mini",
system_prompt="Gather comprehensive facts and background."),
Agent("analyst", model="openai/gpt-4o-mini",
system_prompt="Evaluate trends, risks, and opportunities."),
Agent("writer", model="openai/gpt-4o-mini",
system_prompt="Produce clear, engaging written content."),
]
orch = Orchestrator(
agents=agents,
router_model="openai/gpt-4o-mini",
synthesizer_model="openai/gpt-4o",
max_parallel_agents=5,
)
# Non-streaming
result = await orch.run("What are the key challenges of AI in healthcare?")
print(result.final_text)
print(result.total_tokens)
# Streaming
async for event in orch.stream("..."):
print(event.to_sse(), end="")
How routing works
The router calls router_model with a JSON-mode prompt listing all agents. The model returns either:
{
"route": "single",
"agent": "researcher",
"task": "Research quantum computing trends"
}
{
"route": "parallel",
"agents": [
{"name": "researcher", "task": "Gather facts"},
{"name": "analyst", "task": "Evaluate trends"}
]
}
Custom prompts
orch = Orchestrator(
agents=agents,
router_model="openai/gpt-4o-mini",
router_system_prompt="Your custom routing instructions...\n\nAgents:\n{agent_descriptions}",
synthesizer_system_prompt="Merge these results into a single answer.",
)
Model Context Protocol
deepcrew-ai implements all three MCP transport types. Attach any combination to an agent via mcps=[...]. Tools are discovered in parallel on first use.
MCPClient abstract base and support the async context manager protocol: async with StdioMCP(...) as mcp:| Transport | Class | Use case |
|---|---|---|
| stdio | StdioMCP | Subprocess MCP servers (npx, python, etc.) |
| streamable-HTTP | HTTPMCP | Remote MCP servers — modern protocol (2024-11-05+) |
| SSE | SSEMCP | Legacy /sse + /messages protocol servers |
| multi | MCPManager | Aggregate any mix of the above into one interface |
StdioMCP
Spawns a subprocess and communicates via JSON-RPC over stdin/stdout. The standard transport for MCP servers distributed as npm packages or Python scripts.
from deepcrew import Agent, run_agent
from deepcrew.mcp import StdioMCP
# Official filesystem MCP server (Node.js / npx)
async with StdioMCP(
command="npx",
args=["-y", "@modelcontextprotocol/server-filesystem", "."],
env={"SOME_VAR": "value"}, # merged with current environment
) as mcp:
tools = await mcp.list_tools()
print([t.name for t in tools])
agent = Agent(
name="file_agent",
model="openai/gpt-4o",
system_prompt="Help users with file operations.",
mcps=[mcp],
)
result = await run_agent(
agent,
[{"role": "user", "content": "List the files in the current directory."}],
)
print(result.text)
Constructor parameters
"npx", "python", "uvx".HTTPMCP
Modern streamable-HTTP transport (MCP spec 2024-11-05+). Manages a Mcp-Session-Id header, automatically re-initializes on expired sessions, and retries transient errors with exponential backoff.
from deepcrew.mcp import HTTPMCP
async with HTTPMCP(
url="https://my-mcp.example.com/mcp",
headers={"Authorization": "Bearer sk-..."},
timeout=30,
retries=2,
) as mcp:
tools = await mcp.list_tools()
result = await mcp.call_tool("search", {"query": "hello world"})
SSEMCP
Legacy SSE transport. Connects to /sse to receive the endpoint event, then POSTs JSON-RPC requests to that endpoint.
from deepcrew.mcp import SSEMCP
async with SSEMCP(
base_url="http://localhost:3000",
headers={"X-API-Key": "secret"},
) as mcp:
tools = await mcp.list_tools()
result = await mcp.call_tool("my_tool", {"arg": "value"})
httpx-sse is included in the default dependencies. If you see an import error, run pip install httpx-sse.MCPManager
Aggregates multiple MCP clients. Discovers tools from all servers in parallel and routes call_tool() to the right server automatically. Implements MCPClient so it can be passed directly to Agent.mcps.
from deepcrew import Agent, run_agent
from deepcrew.mcp import MCPManager, StdioMCP, HTTPMCP
async with MCPManager([
StdioMCP("npx", ["-y", "@modelcontextprotocol/server-filesystem", "."]),
HTTPMCP("https://search-mcp.example.com/mcp"),
HTTPMCP("https://calendar-mcp.example.com/mcp"),
]) as manager:
tools = await manager.discover_tools()
print(f"{len(tools)} tools from 3 servers")
# Pass the manager as a single MCP — it handles routing
agent = Agent(
name="super_agent",
model="openai/gpt-4o",
system_prompt="Use all available tools to answer questions.",
mcps=[manager],
)
result = await run_agent(agent, [{"role": "user", "content": "..."}])
API Reference
All public names exported from import deepcrew.
run_agent()
async def run_agent(
agent: Agent,
messages: list[dict[str, Any]],
*,
tool_defs: list[ToolDef] | None = None,
queue: asyncio.Queue[StreamEvent | None] | None = None,
agent_id: str | None = None,
) -> AgentResult
Orchestrator
class Orchestrator:
def __init__(
self,
agents: list[Agent],
router_model: str = "openai/gpt-4o-mini",
synthesizer_model: str | None = None,
router_system_prompt: str | None = None,
synthesizer_system_prompt: str | None = None,
max_parallel_agents: int = 5,
) -> None
async def run(self, query: str, context: dict | None = None) -> OrchestratorResult
def stream(self, query: str, context: dict | None = None) -> AsyncGenerator[StreamEvent, None]
WorkflowBuilder
class WorkflowBuilder:
def add_agent(self, name: str, agent: Agent,
task: str | Callable = "{input}") -> WorkflowBuilder
def then(self, predecessor: str, successor: str) -> WorkflowBuilder
async def run(self, initial_input: str, context: dict | None = None) -> WorkflowResult
def stream(self, initial_input: str, context: dict | None = None) -> AsyncGenerator[StreamEvent, None]
Result types
@dataclass class AgentResult:
agent_id: str
text: str
tool_calls: list[dict]
input_tokens: int
output_tokens: int
model: str
total_tokens: int # property
@dataclass class WorkflowResult:
outputs: dict[str, AgentResult] # node_name → result
final_output: AgentResult | None
total_input_tokens: int
total_output_tokens: int
total_tokens: int # property
@dataclass class OrchestratorResult:
final_text: str
agent_results: list[AgentResult]
router_result: AgentResult | None
total_input_tokens: int
total_output_tokens: int
total_tokens: int # property
Stream Events
Every event has event (EventType), agent_id (str), and data (dict). Call .to_sse() to encode as a Server-Sent Event string.
| Event | data keys | Emitted by |
|---|---|---|
| agent_start | model | run_agent at loop start |
| text_delta | chunk | Each streamed text token |
| thinking_delta | chunk | Thinking/reasoning tokens (where supported) |
| tool_call | tool, args | Before tool execution |
| tool_result | tool, result | After tool execution |
| agent_done | input_tokens, output_tokens | Agent loop complete |
| step_start | node | WorkflowBuilder: node begins |
| step_done | node | WorkflowBuilder: node finished |
| error | message | Any exception during execution |
| done | final_text | Entire run complete |
from deepcrew.types import EventType, StreamEvent
event = StreamEvent(EventType.TEXT_DELTA, {"chunk": "Hello"}, agent_id="bot")
# SSE string: "event: text_delta\ndata: {"agent_id": "bot", "chunk": "Hello"}\n\n"
print(event.to_sse())
# dict form
print(event.to_dict())
Full Examples
Multi-step research pipeline
import asyncio
from deepcrew import Agent, WorkflowBuilder
from deepcrew.types import EventType
async def main():
models = {
"fast": "openai/gpt-4o-mini",
"smart": "openai/gpt-4o",
}
workflow = (
WorkflowBuilder()
.add_agent("outline", Agent("outline", model=models["fast"],
system_prompt="Create a detailed outline."), task="{input}")
.add_agent("section1", Agent("section1", model=models["fast"],
system_prompt="Write section 1 in depth."),
task="Topic: {input}\n\nOutline:\n{outline}\n\nWrite section 1.")
.add_agent("section2", Agent("section2", model=models["fast"],
system_prompt="Write section 2 in depth."),
task="Topic: {input}\n\nOutline:\n{outline}\n\nWrite section 2.")
.add_agent("editor", Agent("editor", model=models["smart"],
system_prompt="Edit and merge into a cohesive document."),
task="Merge and edit:\n\n{section1}\n\n---\n\n{section2}")
.then("outline", "section1")
.then("outline", "section2")
.then("section1", "editor")
.then("section2", "editor")
)
print("Running: outline → [section1 + section2 parallel] → editor\n")
async for event in workflow.stream("The history of the internet"):
if event.event == EventType.STEP_START:
print(f"\n▶ {event.data['node']}")
elif event.event == EventType.TEXT_DELTA:
print(event.data["chunk"], end="", flush=True)
asyncio.run(main())
FastAPI + SSE streaming endpoint
from fastapi import FastAPI, Request
from fastapi.responses import StreamingResponse
from pydantic import BaseModel
from deepcrew import Agent, Orchestrator
app = FastAPI()
orch = Orchestrator([
Agent("researcher", model="openai/gpt-4o-mini",
system_prompt="Research any topic thoroughly."),
Agent("writer", model="openai/gpt-4o",
system_prompt="Write clear, engaging content."),
], router_model="openai/gpt-4o-mini", synthesizer_model="openai/gpt-4o")
class ChatRequest(BaseModel):
message: str
@app.post("/chat/stream")
async def chat_stream(req: ChatRequest):
async def generate():
async for event in orch.stream(req.message):
yield event.to_sse()
yield "event: done\ndata: {}\n\n"
return StreamingResponse(generate(), media_type="text/event-stream",
headers={"Cache-Control": "no-cache",
"X-Accel-Buffering": "no"})
MCP filesystem agent
import asyncio
from deepcrew import Agent, run_agent
from deepcrew.mcp import StdioMCP
async def main():
async with StdioMCP("npx", ["-y", "@modelcontextprotocol/server-filesystem", "."]) as mcp:
agent = Agent(
name="fs",
model="openai/gpt-4o",
system_prompt="Help users manage their files. Always confirm before deleting.",
mcps=[mcp],
)
result = await run_agent(agent, [
{"role": "user", "content": "Show me all Python files and their sizes."},
])
print(result.text)
asyncio.run(main())