Multi-agent AI,
without the boilerplate.
Build parallel AI workflows with any LLM provider. APEX synthesis, intelligent agent spawning, iterative looping, skills, memory, and OpenTelemetry observability — all async-first.
from deepcrew import Agent, Orchestrator, ApexConfig
researcher = Agent("researcher", model="openai/gpt-4o-mini",
system_prompt="Research thoroughly.")
analyst = Agent("analyst", model="anthropic/claude-haiku-4-5-20251001",
system_prompt="Analyze critically.")
writer = Agent("writer", model="gemini/gemini-2.0-flash",
system_prompt="Write clearly.")
orch = Orchestrator(
agents=[researcher, analyst, writer],
router_model="openai/gpt-4o-mini",
apex_model="openai/gpt-4o",
apex_config=ApexConfig(cite_sources=True),
)
result = await orch.run("Future of renewable energy")
print(result.final_text)
print(f"Confidence: {result.agent_results[-1].confidence:.2f}")
Parallel by default
Independent agents and tool calls always run concurrently via asyncio.gather — no extra configuration needed.
100+ LLM providers
LiteLLM handles OpenAI, Anthropic, Gemini, Bedrock, Azure, Ollama — same API, zero provider-specific code.
APEX synthesis
Confidence-scored, citation-aware multi-agent synthesis engine. Know how sure your AI is about its answer.
Agent spawning
Any agent can spawn sub-agents mid-loop with intelligent tool allocation. Claude Code-style dynamic orchestration.
Skills + MCP tools
Reusable capability bundles via @skill, built-ins like WebSearchSkill, and MCP stdio/HTTP/SSE transports.
Observability
OpenTelemetry spans for every LLM call, tool execution, and workflow step. Zero overhead when disabled.
Memory providers
Pluggable short-term and persistent context stores. Memories auto-inject into agent context before LLM calls.
Retry & fallback
Per-agent exponential backoff with configurable model fallback chains. Never fail on transient errors.
Installation
Requires Python 3.11+. Install from PyPI:
# Core library
pip install deepcrew-ai
# With OpenTelemetry support
pip install "deepcrew-ai[otel]"
# With dev dependencies (pytest, respx)
pip install "deepcrew-ai[dev]"
Set your API keys
export OPENAI_API_KEY="sk-..."
export ANTHROPIC_API_KEY="sk-ant-..."
export GEMINI_API_KEY="AIza..."
export AWS_ACCESS_KEY_ID="..." # for Bedrock
export AZURE_API_KEY="..." # for Azure OpenAI
Your first agent
import asyncio
from deepcrew import Agent, run_agent, tool
@tool
def get_weather(city: str) -> dict:
"Get current weather for a city."
return {"city": city, "temperature": 22, "condition": "sunny"}
async def main():
agent = Agent(
name="assistant",
model="openai/gpt-4o-mini",
system_prompt="You are a helpful weather assistant.",
tools=[get_weather],
)
result = await run_agent(
agent,
[{"role": "user", "content": "What's the weather in Tokyo?"}],
)
print(result.text)
print(f"Tokens: {result.input_tokens}in / {result.output_tokens}out")
asyncio.run(main())
Agents
An Agent is a dataclass that bundles a model, system prompt, and tools. Agents are stateless — pass conversation history into run_agent() each call.
from deepcrew import Agent, RetryPolicy, FallbackChain, LoopConfig, InMemoryProvider
from deepcrew import WebSearchSkill
from deepcrew.mcp import HTTPMCP
agent = Agent(
name="researcher",
model="anthropic/claude-opus-4-8",
system_prompt="You are a research specialist.",
mcps=[HTTPMCP("https://my-mcp.example.com/mcp")],
tools=[my_fn],
skills=[WebSearchSkill()], # v0.2.0: reusable skill bundles
memory=InMemoryProvider(), # v0.2.0: context persistence
retry_policy=RetryPolicy(max_retries=3),# v0.2.0: retry on failure
fallback_chain=FallbackChain([ # v0.2.0: model fallbacks
"openai/gpt-4o",
"gemini/gemini-2.0-flash",
]),
loop_config=LoopConfig(max_iterations=3),# v0.2.0: outer refinement loop
max_turns=10,
temperature=0.7,
max_tokens=4096,
)
All Agent parameters
"openai/gpt-4o", "anthropic/claude-opus-4-8", "gemini/gemini-2.0-flash", etc.@tool. JSON Schema auto-generated from type hints.MaxTurnsError raised if exceeded.None uses the provider default.None uses the provider default.litellm.acompletion.Python Function Tools
Decorate any Python function with @tool. JSON Schema is generated automatically from type hints.
from deepcrew import tool, Agent
@tool
def search_web(query: str, max_results: int = 5) -> list[str]:
"""Search the web and return URLs.
Args:
query (str): The search query.
max_results (int): Maximum results to return.
"""
return ["https://example.com/..."]
@tool(name="db_lookup", description="Query the internal database")
def lookup_database(table: str, id: int) -> dict:
return {"id": id, "data": "..."}
@tool
async def fetch_url(url: str) -> str:
"Fetch the HTML content of a URL."
import httpx
async with httpx.AsyncClient() as client:
r = await client.get(url, timeout=10.0)
return r.text[:3000]
agent = Agent("assistant", model="openai/gpt-4o",
tools=[search_web, lookup_database, fetch_url])
Supported parameter types
| Python type | JSON Schema type |
|---|---|
str | "string" |
int | "integer" |
float | "number" |
bool | "boolean" |
list[str] | {"type":"array","items":{"type":"string"}} |
dict[str, int] | {"type":"object","additionalProperties":{"type":"integer"}} |
Optional[str] | {"type":["string","null"]} |
Literal["a","b"] | {"enum":["a","b"]} |
Runner
run_agent() executes a single agent's agentic loop: call LLM → buffer tool calls → execute in parallel → append results → repeat.
from deepcrew import Agent, run_agent, ObservabilityConfig
agent = Agent("assistant", model="openai/gpt-4o")
obs = ObservabilityConfig(otel_endpoint="http://localhost:4317")
result = await run_agent(
agent,
messages=[{"role": "user", "content": "Hello!"}],
tool_defs=None, # optional: pre-fetched ToolDef list
queue=None, # optional: asyncio.Queue for streaming
agent_id=None, # optional: override agent_id in events
observability=obs, # v0.2.0: optional OTel config
)
print(result.text)
print(result.confidence) # v0.2.0: populated by APEX (None when direct)
print(result.loop_iterations) # v0.2.0: outer loop count
Loop behaviour
- Streams via
litellm.acompletion(stream=True)and buffers tool call deltas by index - Executes all tool calls in a single turn in parallel via
asyncio.gather - Injects relevant memories before the first LLM call (if
agent.memoryis set) - Wraps the LLM call with retry/fallback logic (if
agent.retry_policyoragent.fallback_chainis set) - Delegates to
run_agent_loop()transparently whenagent.loop_configis set - Exits when the model produces no tool calls, or when
max_turnsis reached (MaxTurnsError)
Streaming
Every part of deepcrew-ai emits StreamEvent objects. Consume as an async generator or encode as Server-Sent Events for web clients.
With FastAPI
from fastapi import FastAPI
from fastapi.responses import StreamingResponse
from deepcrew import Agent, Orchestrator, ApexConfig
app = FastAPI()
orch = Orchestrator(
agents=[Agent("assistant", model="openai/gpt-4o", system_prompt="Be helpful.")],
apex_config=ApexConfig(cite_sources=True),
)
@app.post("/chat")
async def chat(query: str):
async def event_stream():
async for event in orch.stream(query):
yield event.to_sse() # "event: text_delta\ndata: {...}\n\n"
return StreamingResponse(
event_stream(),
media_type="text/event-stream",
headers={"Cache-Control": "no-cache", "X-Accel-Buffering": "no"},
)
With asyncio.Queue
import asyncio
from deepcrew import Agent, run_agent
from deepcrew.types import EventType
queue = asyncio.Queue()
agent = Agent("bot", model="openai/gpt-4o")
task = asyncio.create_task(
run_agent(agent, [{"role": "user", "content": "Hi"}], queue=queue)
)
while True:
event = await queue.get()
if event is None:
break
if event.event == EventType.TEXT_DELTA:
print(event.data["chunk"], end="", flush=True)
elif event.event == EventType.APEX_DONE:
print(f"\nConfidence: {event.data['confidence']:.2f}")
await task
Workflow Builder
Build explicit agent pipelines as a directed acyclic graph. Independent nodes at each level run in parallel automatically.
from deepcrew import Agent, WorkflowBuilder, ObservabilityConfig
obs = ObservabilityConfig(otel_endpoint="http://localhost:4317")
workflow = (
WorkflowBuilder(observability=obs) # v0.2.0: OTel tracing per step
.add_agent("research", Agent("researcher", model="openai/gpt-4o-mini",
system_prompt="Research the topic."), task="{input}")
.add_agent("critique", Agent("critic", model="anthropic/claude-haiku-4-5-20251001",
system_prompt="Find gaps."), task="Critique:\n{research}")
.add_agent("expand", Agent("expander", model="gemini/gemini-2.0-flash",
system_prompt="Add technical depth."), task="Expand:\n{research}")
.add_agent("report", Agent("writer", model="openai/gpt-4o",
system_prompt="Write a polished report."),
task="Write a report.\n\nResearch:\n{research}\n\nCritique:\n{critique}\n\nDetails:\n{expand}")
.then("research", "critique")
.then("research", "expand")
.then("critique", "report")
.then("expand", "report")
)
# research → [critique + expand in parallel] → report
result = await workflow.run("Future of renewable energy")
print(result.final_output.text)
print(f"Total tokens: {result.total_tokens}")
Streaming workflow events
from deepcrew.types import EventType
async for event in workflow.stream("My topic"):
if event.event == EventType.STEP_START:
print(f"▶ {event.data['node']} starting...")
elif event.event == EventType.TEXT_DELTA:
print(event.data["chunk"], end="", flush=True)
elif event.event == EventType.STEP_DONE:
print(f"\n✓ {event.data['node']} done")
elif event.event == EventType.DONE:
print(f"\n\nFinal answer ready")
Orchestrator
Automated mode: a router LLM decides which agents to run, they execute in parallel, and APEX synthesizes the results with confidence scoring.
from deepcrew import Agent, Orchestrator, ApexConfig, tool
@tool
def search_web(query: str) -> str: ...
@tool
def run_code(code: str) -> str: ...
orch = Orchestrator(
agents=[
Agent("researcher", model="openai/gpt-4o-mini",
system_prompt="Gather comprehensive facts."),
Agent("analyst", model="anthropic/claude-haiku-4-5-20251001",
system_prompt="Evaluate trends and risks."),
Agent("coder", model="gemini/gemini-2.0-flash",
system_prompt="Write and test code examples."),
],
router_model="openai/gpt-4o-mini",
apex_model="openai/gpt-4o",
apex_config=ApexConfig(
cite_sources=True, # adds [source: agent_name] inline
confidence_threshold=0.8,
allow_tools=False,
),
global_tools=[search_web, run_code], # v0.2.0: router allocates per agent
enable_spawn=True, # v0.2.0: agents can spawn sub-agents
max_parallel_agents=5,
)
result = await orch.run("What are the key challenges of AI in healthcare?")
print(result.final_text)
# Streaming
async for event in orch.stream("..."):
print(event.to_sse(), end="")
Model Context Protocol
All three MCP transport types. Attach any combination to an agent via mcps=[...]. Tools are discovered in parallel on first use.
| Transport | Class | Use case |
|---|---|---|
| stdio | StdioMCP | Subprocess MCP servers (npx, python, uvx) |
| streamable-HTTP | HTTPMCP | Remote MCP servers — modern protocol |
| SSE | SSEMCP | Legacy /sse + /messages protocol |
| multi | MCPManager | Aggregate any mix into one interface |
StdioMCP
from deepcrew import Agent, run_agent
from deepcrew.mcp import StdioMCP
async with StdioMCP(
command="npx",
args=["-y", "@modelcontextprotocol/server-filesystem", "."],
env={"SOME_VAR": "value"},
) as mcp:
agent = Agent("file_agent", model="openai/gpt-4o",
system_prompt="Help with file operations.", mcps=[mcp])
result = await run_agent(agent, [{"role": "user", "content": "List Python files."}])
HTTPMCP
from deepcrew.mcp import HTTPMCP
async with HTTPMCP(
url="https://my-mcp.example.com/mcp",
headers={"Authorization": "Bearer sk-..."},
timeout=30,
retries=2,
) as mcp:
tools = await mcp.list_tools()
result = await mcp.call_tool("search", {"query": "hello world"})
SSEMCP
from deepcrew.mcp import SSEMCP
async with SSEMCP("http://localhost:3000", headers={"X-API-Key": "secret"}) as mcp:
tools = await mcp.list_tools()
result = await mcp.call_tool("my_tool", {"arg": "value"})
MCPManager
from deepcrew import Agent, run_agent
from deepcrew.mcp import MCPManager, StdioMCP, HTTPMCP
async with MCPManager([
StdioMCP("npx", ["-y", "@modelcontextprotocol/server-filesystem", "."]),
HTTPMCP("https://search-mcp.example.com/mcp"),
HTTPMCP("https://calendar-mcp.example.com/mcp"),
]) as manager:
agent = Agent("super_agent", model="openai/gpt-4o",
system_prompt="Use all available tools.", mcps=[manager])
result = await run_agent(agent, [{"role": "user", "content": "..."}])
API Reference
All public names exported from import deepcrew. New in v0.2.0 marked with v0.2.
run_agent()
async def run_agent(
agent: Agent,
messages: list[dict[str, Any]],
*,
tool_defs: list[ToolDef] | None = None,
queue: asyncio.Queue[StreamEvent | None] | None = None,
agent_id: str | None = None,
observability: ObservabilityConfig | None = None, # v0.2.0
) -> AgentResult
Orchestrator
class Orchestrator:
def __init__(
self,
agents: list[Agent],
router_model: str = "openai/gpt-4o-mini",
synthesizer_model: str | None = None, # backward compat alias
apex_model: str | None = None, # v0.2.0
apex_config: ApexConfig | None = None, # v0.2.0
router_system_prompt: str | None = None,
synthesizer_system_prompt: str | None = None,
max_parallel_agents: int = 5,
global_tools: list[ToolDef] | None = None, # v0.2.0
enable_spawn: bool = False, # v0.2.0
) -> None
async def run(self, query: str, context: dict | None = None) -> OrchestratorResult
def stream(self, query: str, context: dict | None = None) -> AsyncGenerator[StreamEvent, None]
WorkflowBuilder
class WorkflowBuilder:
def __init__(self, observability: ObservabilityConfig | None = None) # v0.2.0
def add_agent(self, name: str, agent: Agent, task: str | Callable = "{input}") -> WorkflowBuilder
def then(self, predecessor: str, successor: str) -> WorkflowBuilder
async def run(self, initial_input: str, context: dict | None = None) -> WorkflowResult
def stream(self, initial_input: str, context: dict | None = None) -> AsyncGenerator[StreamEvent, None]
v0.2.0 classes
# APEX
class APEXSynthesizer:
def __init__(self, model: str, config: ApexConfig | None = None)
async def synthesize(self, query: str, results: list[AgentResult], ...) -> AgentResult
@dataclass class ApexConfig:
confidence_threshold: float = 0.7
cite_sources: bool = True
allow_tools: bool = False
system_prompt: str | None = None
# Loop
@dataclass class LoopConfig:
max_iterations: int = 5
convergence_fn: Callable[[AgentResult], bool] | None = None
stop_condition: Callable[[AgentResult], bool] | None = None
refine_prompt: str = "..."
async def run_agent_loop(agent, messages, *, tool_defs=None, queue=None, agent_id=None) -> AgentResult
async def search_loop(query, search_tool, *, agent, max_iterations=3, confidence_threshold=0.8, queue=None) -> AgentResult
# Memory
class MemoryProvider(ABC): # store / retrieve / search / clear
class InMemoryProvider(MemoryProvider)
class FileMemoryProvider(MemoryProvider):
def __init__(self, path: str | Path)
# Retry
@dataclass class RetryPolicy:
max_retries: int = 3
backoff_seconds: float = 1.0
retry_on: tuple[type[Exception], ...] = (Exception,)
exponential: bool = True
@dataclass class FallbackChain:
models: list[str]
# Skills
class Skill(ABC): # name / description / parameters / execute(**kwargs) -> str / to_tool_def()
def skill(fn=None, *, name=None, description=None) # decorator
class SkillRegistry: # register / get / list_all / clear
class WebSearchSkill(Skill)
class SummarizeSkill(Skill):
def __init__(self, model: str = "openai/gpt-4o-mini")
class CodeExecutionSkill(Skill):
def __init__(self, timeout: float = 10.0)
# Spawning
@dataclass class SpawnRequest:
task: str
tools: list[str] = []
model: str | None = None
system_prompt: str | None = None
max_turns: int = 5
class ToolAllocator:
def __init__(self, router_model: str)
async def allocate(self, task: str, all_tools: list[ToolDef], max_tools: int = 10) -> list[ToolDef]
async def spawn_agent(request, all_tool_defs, parent_queue=None, router_model=..., parent_agent_id=...) -> AgentResult
def make_spawn_tool(all_tool_defs, parent_queue, router_model, parent_agent_id) -> ToolDef
# Observability
@dataclass class ObservabilityConfig:
otel_endpoint: str | None = None
service_name: str = "deepcrew"
enabled: bool = True
export_format: Literal["grpc","http"] = "grpc"
Result types
@dataclass class AgentResult:
agent_id: str
text: str
tool_calls: list[dict]
input_tokens: int
output_tokens: int
model: str
confidence: float | None # v0.2.0 — populated by APEX
loop_iterations: int # v0.2.0 — outer loop count
total_tokens: int # property
@dataclass class WorkflowResult:
outputs: dict[str, AgentResult]
final_output: AgentResult | None
total_input_tokens: int
total_output_tokens: int
total_tokens: int # property
@dataclass class OrchestratorResult:
final_text: str
agent_results: list[AgentResult]
router_result: AgentResult | None
total_input_tokens: int
total_output_tokens: int
total_tokens: int # property
Stream Events
Every event has event (EventType), agent_id (str), and data (dict). Call .to_sse() to encode as a Server-Sent Event string.
| Event | data keys | Emitted by |
|---|---|---|
| agent_start | model | run_agent at loop start |
| text_delta | chunk | Each streamed text token |
| thinking_delta | chunk | Thinking/reasoning tokens (where supported) |
| tool_call | tool, args | Before tool execution |
| tool_result | tool, result | After tool execution |
| agent_done | input_tokens, output_tokens | Agent loop complete |
| apex_start | agents | v0.2 APEX synthesis begins |
| apex_done | confidence | v0.2 APEX synthesis complete |
| loop_iteration | iteration, converged | v0.2 Each outer loop iteration |
| spawn_agent | task, requested_tools | v0.2 Sub-agent spawned |
| memory_retrieve | count | v0.2 Memories injected to context |
| memory_store | key | v0.2 Tool result stored to memory |
| retry_attempt | attempt, model, delay | v0.2 Before a retry |
| fallback_triggered | from_model, to_model | v0.2 Model switch |
| step_start | node | WorkflowBuilder: node begins |
| step_done | node | WorkflowBuilder: node finished |
| error | message | Any exception |
| done | final_text | Entire run complete |