deepcrew-ai v0.2.0
Eight major features that bring deepcrew-ai to Claude Code parity. Every feature is additive — your v0.1.0 code runs unchanged.
APEX Synthesizer
Confidence-scored, citation-aware result synthesis. Know how sure your AI is.
Agent Spawning
Claude Code-style: any agent can spawn sub-agents mid-loop with smart tool allocation.
Looping
Outer iteration loop for iterative search-refine patterns with convergence control.
Skills
Reusable capability bundles. Built-ins: WebSearch, Summarize, CodeExecution.
Memory Providers
Pluggable context stores auto-injected into each LLM call. InMemory and File backends.
Retry & Fallback
Per-agent exponential backoff with model fallback chains. Never fail on transient errors.
Observability
OpenTelemetry spans for every LLM call, tool execution, and workflow step.
CLI
deepcrew run workflow.yaml — declarative workflow execution from the terminal.
APEX Synthesizer
APEX replaces the original plain synthesizer with an intelligent synthesis engine. It produces a confidence score (0.0–1.0) on every result, optionally cites which agent contributed each fact, and can even call tools mid-synthesis for verification.
How it works
- All agent results are collected after parallel execution
- APEX receives the original query + all agent outputs in a structured prompt
- It synthesizes a unified answer and ends its response with
CONFIDENCE: 0.85 - The confidence line is parsed out and stored on
AgentResult.confidence - When
cite_sources=True, APEX adds[source: agent_name]inline markers
Basic usage
from deepcrew import Agent, Orchestrator, ApexConfig
orch = Orchestrator(
agents=[
Agent("researcher", model="openai/gpt-4o-mini", system_prompt="Research facts."),
Agent("analyst", model="anthropic/claude-haiku-4-5-20251001", system_prompt="Analyze data."),
],
router_model="openai/gpt-4o-mini",
apex_model="openai/gpt-4o",
apex_config=ApexConfig(
cite_sources=True, # [source: researcher] / [source: analyst] inline
confidence_threshold=0.75, # warn if below (future: request more agents)
allow_tools=False, # APEX can call tools mid-synthesis (experimental)
),
)
result = await orch.run("What causes inflation?")
print(result.final_text)
print(f"Confidence: {result.agent_results[-1].confidence:.2%}")
Standalone APEX
from deepcrew import APEXSynthesizer, ApexConfig, AgentResult
# Use APEX outside of Orchestrator — e.g., synthesize cached results
apex = APEXSynthesizer(
model="openai/gpt-4o",
config=ApexConfig(cite_sources=True),
)
results: list[AgentResult] = [...] # your pre-computed results
synthesis = await apex.synthesize(
original_query="Explain quantum entanglement",
results=results,
)
print(synthesis.text)
print(f"Confidence: {synthesis.confidence:.2f}")
# Citations
for citation in apex.build_citations(results, synthesis.text):
print(f"[{citation.agent_id}] {citation.claim[:80]}")
ApexConfig reference
EventType.APEX_DONE includes a below_threshold=True flag for consumers to handle.[source: agent_name] inline markers to attribute facts to specific agents.tool_defs to synthesize().CONFIDENCE: float.APEX events
from deepcrew.types import EventType
async for event in orch.stream("..."):
if event.event == EventType.APEX_START:
agents = event.data["agents"]
print(f"APEX synthesizing from {len(agents)} agents: {agents}")
elif event.event == EventType.APEX_DONE:
conf = event.data["confidence"]
print(f"APEX done — confidence {conf:.2f}")
if conf < 0.7:
print("⚠ Low confidence — consider adding more specialist agents")
Agent Spawning
deepcrew-ai v0.2.0 introduces Claude Code-style dynamic agent spawning. Any running agent can call a built-in spawn_agent tool to create a sub-agent mid-loop, with tools automatically selected from a global pool by the ToolAllocator.
How it works
- You provide a
global_toolspool toOrchestrator - With
enable_spawn=True, every agent gets aspawn_agent(task, tools, model)tool injected - When an agent calls
spawn_agent,ToolAllocatoruses the router LLM to pick the most relevant tools from the global pool for that specific task - A fresh sub-agent is created and runs to completion, its result returned to the parent
- A
SPAWN_AGENTstream event is emitted for observability
Enable spawning via Orchestrator
from deepcrew import Agent, Orchestrator, tool
@tool
def search_web(query: str) -> str:
"Search the web."
...
@tool
def read_file(path: str) -> str:
"Read a local file."
...
@tool
def run_sql(query: str) -> list[dict]:
"Execute a SQL query."
...
@tool
def call_api(url: str, method: str = "GET") -> dict:
"Make an HTTP API call."
...
master = Agent(
name="coordinator",
model="openai/gpt-4o",
system_prompt="""You are a coordinator agent. For complex subtasks,
use the spawn_agent tool to delegate to a specialized sub-agent.""",
)
orch = Orchestrator(
agents=[master],
router_model="openai/gpt-4o-mini",
apex_model="openai/gpt-4o",
global_tools=[search_web, read_file, run_sql, call_api], # pool
enable_spawn=True, # injects spawn_agent tool into master
)
result = await orch.run(
"Research the top 3 AI papers from last month and summarize key findings."
)
print(result.final_text)
Standalone spawning
from deepcrew import spawn_agent, SpawnRequest
request = SpawnRequest(
task="Find all Python files that import pandas and list their names.",
tools=["read_file", "search_web"], # hint: names from global pool
model="openai/gpt-4o-mini",
system_prompt="You are a code analysis assistant.",
max_turns=5,
)
result = await spawn_agent(
request=request,
all_tool_defs=[search_web_def, read_file_def],
parent_queue=my_queue,
router_model="openai/gpt-4o-mini",
parent_agent_id="coordinator",
)
print(result.text)
ToolAllocator
from deepcrew import ToolAllocator
allocator = ToolAllocator(router_model="openai/gpt-4o-mini")
# Given a task description and a large pool of tools,
# returns only the most relevant subset (up to max_tools)
relevant_tools = await allocator.allocate(
task="Analyze sentiment in customer reviews and generate a report",
all_tools=my_tool_defs, # list[ToolDef] — could be dozens of tools
max_tools=5, # return at most 5
)
print([t.name for t in relevant_tools])
ToolAllocator prompt includes each tool's name and description. A good tool description dramatically improves allocation accuracy.SpawnRequest reference
Looping Methodology
The outer iteration loop is distinct from the inner per-turn max_turns cycle. The loop runs the entire agent (including all its tool calls) and re-runs it if the result doesn't meet a convergence criterion — ideal for search-refine, draft-critique, and iterative research patterns.
How it works
- Agent runs (all inner turns until no more tool calls)
convergence_fn(result)is called — if it returnsTrue, the loop exits- If not converged,
refine_promptis appended and the agent runs again - Loop exits when
max_iterationsis reached or convergence is achieved result.loop_iterationsrecords how many outer iterations ran
Basic loop with convergence
from deepcrew import Agent, run_agent, LoopConfig, tool
@tool
def search_web(query: str) -> str:
"Search the web for information."
...
agent = Agent(
name="researcher",
model="openai/gpt-4o-mini",
system_prompt="You are a thorough researcher. Use search to gather comprehensive information.",
tools=[search_web],
loop_config=LoopConfig(
max_iterations=4,
# Convergence: result is long enough to be a proper answer
convergence_fn=lambda r: len(r.text) > 800 and r.text.count("\n") > 5,
# What to say to the agent if not converged
refine_prompt="Your answer is incomplete. Search for more details and expand your response significantly.",
),
)
result = await run_agent(
agent,
[{"role": "user", "content": "Explain the mechanism of CRISPR-Cas9 gene editing"}],
)
print(result.text)
print(f"Iterations: {result.loop_iterations}")
run_agent_loop() directly
from deepcrew import run_agent_loop, LoopConfig
result = await run_agent_loop(
agent=my_agent,
messages=[{"role": "user", "content": "Draft an executive summary"}],
tool_defs=None,
queue=my_queue,
agent_id="drafter",
)
search_loop() — confidence-based iteration
from deepcrew import search_loop, Agent, tool
@tool
def search_web(query: str) -> str:
"Search the web."
...
agent = Agent("searcher", model="openai/gpt-4o-mini",
system_prompt="You are a research agent.", tools=[search_web])
# Keeps searching until APEX confidence >= 0.8 or 3 iterations
result = await search_loop(
query="What is the current state of nuclear fusion research?",
search_tool=search_web,
agent=agent,
max_iterations=3,
confidence_threshold=0.8,
)
Stop condition (early exit)
from deepcrew import LoopConfig, LoopConvergedError
def check_done(result):
# Raise LoopConvergedError to stop immediately and return result
if "FINAL ANSWER:" in result.text:
raise LoopConvergedError(result)
return False
agent = Agent("reasoner", model="openai/gpt-4o",
system_prompt="When you have a final answer, prefix it with 'FINAL ANSWER:'.",
loop_config=LoopConfig(
max_iterations=6,
convergence_fn=check_done,
))
LoopConfig reference
AgentResult. Return True to stop. Raise LoopConvergedError(result) for immediate exit with that result.LoopConvergedError on its own — useful for externalizing early-exit logic."Your answer is incomplete. Please search for more information and expand your response."Loop events
from deepcrew.types import EventType
while True:
event = await queue.get()
if event is None: break
if event.event == EventType.LOOP_ITERATION:
i = event.data["iteration"]
converged = event.data["converged"]
print(f"Loop iteration {i} — {'converged' if converged else 'refining...'}")
Skills
Skills are higher-level capability bundles. They look identical to tools from the LLM's perspective (both become ToolDef), but can wrap multi-step logic, sub-agents, or external APIs internally. Three built-ins are included; you can also create custom skills with the @skill decorator.
Built-in skills
from deepcrew import Agent, run_agent
from deepcrew import WebSearchSkill, SummarizeSkill, CodeExecutionSkill
agent = Agent(
name="assistant",
model="openai/gpt-4o",
system_prompt="You are a versatile AI assistant.",
skills=[
WebSearchSkill(), # DuckDuckGo Instant Answer API
SummarizeSkill(model="openai/gpt-4o-mini"), # LLM-backed summarization
CodeExecutionSkill(timeout=15.0), # sandboxed Python subprocess
],
)
result = await run_agent(agent, [
{"role": "user", "content": "Search for Python async best practices, summarize them, then write a demo script and run it."}
])
print(result.text)
@skill decorator — custom skills
from deepcrew import skill, Agent
@skill(name="translate", description="Translate text to another language")
async def translate(text: str, target_language: str) -> str:
"""
Args:
text (str): The text to translate.
target_language (str): Target language code, e.g. 'es', 'fr', 'ja'.
"""
import httpx
async with httpx.AsyncClient() as client:
r = await client.post(
"https://libretranslate.de/translate",
json={"q": text, "source": "auto", "target": target_language},
)
return r.json()["translatedText"]
@skill(name="send_slack", description="Send a message to a Slack channel")
async def send_slack(channel: str, message: str) -> str:
"""
Args:
channel (str): Slack channel name (without #).
message (str): Message text to send.
"""
import os
import httpx
async with httpx.AsyncClient() as client:
await client.post(
"https://slack.com/api/chat.postMessage",
headers={"Authorization": f"Bearer {os.environ['SLACK_BOT_TOKEN']}"},
json={"channel": channel, "text": message},
)
return f"Message sent to #{channel}"
agent = Agent(
"comms",
model="openai/gpt-4o",
system_prompt="Help with communications and translations.",
skills=[translate, send_slack],
)
Skill class — full custom implementation
from deepcrew.skills.base import Skill
class DatabaseQuerySkill(Skill):
name = "database_query"
description = "Execute a read-only SQL query against the production database"
parameters = {
"type": "object",
"properties": {
"sql": {"type": "string", "description": "SQL SELECT query to execute"},
"limit": {"type": "integer", "description": "Max rows to return", "default": 100},
},
"required": ["sql"],
}
def __init__(self, connection_string: str):
self._conn_str = connection_string
async def execute(self, sql: str, limit: int = 100) -> str:
import asyncpg
conn = await asyncpg.connect(self._conn_str)
try:
rows = await conn.fetch(f"{sql} LIMIT {limit}")
return str([dict(r) for r in rows])
finally:
await conn.close()
# Use it:
agent = Agent("db_agent", model="openai/gpt-4o",
skills=[DatabaseQuerySkill("postgresql://...")])
SkillRegistry
from deepcrew import SkillRegistry
from deepcrew import WebSearchSkill, SummarizeSkill
# Register globally
SkillRegistry.register(WebSearchSkill())
SkillRegistry.register(SummarizeSkill())
# Retrieve by name
search = SkillRegistry.get("web_search")
summarize = SkillRegistry.get("summarize")
# List all registered skills
for skill in SkillRegistry.list_all():
print(f"{skill.name}: {skill.description}")
Built-in skill reference
| Class | Tool name | Description | Config |
|---|---|---|---|
WebSearchSkill | web_search | DuckDuckGo Instant Answer API. Returns top results. | None |
SummarizeSkill | summarize | LLM-backed text summarization. | model="openai/gpt-4o-mini" |
CodeExecutionSkill | execute_code | Runs Python in an isolated subprocess. | timeout=10.0 |
Memory Providers
Memory providers let agents maintain context across turns, runs, and even restarts. They auto-inject into the LLM call (relevant memories added as a system message), and auto-store tool results for future retrieval.
InMemoryProvider — short-term context
from deepcrew import Agent, run_agent, InMemoryProvider
memory = InMemoryProvider()
agent = Agent(
name="assistant",
model="openai/gpt-4o-mini",
system_prompt="You are a helpful assistant with memory.",
memory=memory,
)
# First interaction
result1 = await run_agent(agent, [
{"role": "user", "content": "My name is Alice and I'm building a Python library."}
])
# Memory automatically stores tool results and LLM context
# Second interaction — agent will remember Alice's project
result2 = await run_agent(agent, [
{"role": "user", "content": "What was my project about again?"}
])
FileMemoryProvider — persistent context
from pathlib import Path
from deepcrew import Agent, run_agent, FileMemoryProvider
# Persists across process restarts
memory = FileMemoryProvider(Path.home() / ".deepcrew" / "my_agent_memory.json")
agent = Agent(
name="persistent_bot",
model="openai/gpt-4o-mini",
memory=memory,
)
# All tool results are atomically written to the JSON file
# On next startup, memories are loaded and injected into context
Custom MemoryProvider
from deepcrew.memory.base import MemoryProvider
class RedisMemoryProvider(MemoryProvider):
"""Redis-backed memory for distributed agent deployments."""
def __init__(self, redis_url: str, ttl: int = 3600):
import redis.asyncio as redis
self._client = redis.from_url(redis_url)
self._ttl = ttl
async def store(self, key: str, value: str) -> None:
await self._client.setex(key, self._ttl, value)
async def retrieve(self, key: str) -> str | None:
val = await self._client.get(key)
return val.decode() if val else None
async def search(self, query: str, top_k: int = 5) -> list[str]:
# Simple prefix scan — production should use semantic search
keys = await self._client.keys(f"*{query[:20]}*")
results = []
for key in keys[:top_k]:
val = await self._client.get(key)
if val:
results.append(val.decode())
return results
async def clear(self) -> None:
await self._client.flushdb()
agent = Agent("distributed", model="openai/gpt-4o",
memory=RedisMemoryProvider("redis://localhost:6379"))
Memory events
from deepcrew.types import EventType
async for event in run_agent(agent, messages, queue=queue):
if event.event == EventType.MEMORY_RETRIEVE:
n = event.data["count"]
print(f"Injected {n} memories into context")
elif event.event == EventType.MEMORY_STORE:
key = event.data["key"]
print(f"Stored tool result to memory: {key}")
MemoryProvider ABC
class MemoryProvider(ABC):
@abstractmethod
async def store(self, key: str, value: str) -> None: ...
@abstractmethod
async def retrieve(self, key: str) -> str | None: ...
@abstractmethod
async def search(self, query: str, top_k: int = 5) -> list[str]: ...
@abstractmethod
async def clear(self) -> None: ...
Retry & Fallback Policies
Configure per-agent retry behavior with exponential backoff, and model fallback chains that activate when all retries fail.
Basic retry
from deepcrew import Agent, RetryPolicy
agent = Agent(
name="resilient",
model="openai/gpt-4o",
system_prompt="Be helpful.",
retry_policy=RetryPolicy(
max_retries=3, # try up to 3 more times after the first failure
backoff_seconds=1.0, # base wait between retries
exponential=True, # 1s, 2s, 4s, 8s ... (doubles each time)
retry_on=(Exception,), # retry on any exception (default)
),
)
Retry specific exceptions only
import litellm
from deepcrew import RetryPolicy
# Only retry on rate limit and connection errors
agent = Agent(
"selective_retry",
model="openai/gpt-4o",
retry_policy=RetryPolicy(
max_retries=5,
backoff_seconds=2.0,
retry_on=(
litellm.RateLimitError,
litellm.APIConnectionError,
TimeoutError,
),
),
)
Fallback chain
from deepcrew import Agent, RetryPolicy, FallbackChain
agent = Agent(
name="fault_tolerant",
model="openai/gpt-4o", # primary model
retry_policy=RetryPolicy(
max_retries=2,
backoff_seconds=1.0,
),
fallback_chain=FallbackChain(models=[
"anthropic/claude-haiku-4-5-20251001", # try first if gpt-4o fails all retries
"gemini/gemini-2.0-flash", # try second
"ollama/llama3.2", # local fallback
]),
)
# Flow: gpt-4o → retry 1 → retry 2 → claude-haiku → retry 1 → retry 2 → gemini → ...
Retry events
from deepcrew.types import EventType
while True:
event = await queue.get()
if event is None: break
if event.event == EventType.RETRY_ATTEMPT:
data = event.data
print(f"Retry {data['attempt']} on {data['model']} — waiting {data['delay']:.1f}s")
elif event.event == EventType.FALLBACK_TRIGGERED:
print(f"Falling back from {event.data['from_model']} → {event.data['to_model']}")
RetryPolicy reference
max_retries=3 means up to 4 total calls per model.exponential=True, this doubles each retry.litellm.RateLimitError to avoid retrying logic errors.Observability (OpenTelemetry)
deepcrew-ai emits OpenTelemetry spans for every LLM call, tool execution, and workflow step. When observability=None (the default), all span context managers are nullcontext() — absolutely zero overhead.
Installation
pip install "deepcrew-ai[otel]"
# Installs: opentelemetry-api, opentelemetry-sdk, opentelemetry-exporter-otlp
Quick start with Jaeger
from deepcrew import Agent, Orchestrator, ObservabilityConfig, ApexConfig
obs = ObservabilityConfig(
otel_endpoint="http://localhost:4317", # gRPC endpoint
service_name="my-ai-app",
enabled=True,
export_format="grpc", # or "http" for HTTP/protobuf
)
orch = Orchestrator(
agents=[
Agent("researcher", model="openai/gpt-4o-mini", system_prompt="Research."),
Agent("writer", model="anthropic/claude-haiku-4-5-20251001", system_prompt="Write."),
],
apex_model="openai/gpt-4o",
apex_config=ApexConfig(cite_sources=True),
)
# Pass observability to run() — propagated to all agents automatically
# (Note: Orchestrator.run() accepts **kwargs forwarded to run_agent)
result = await orch.run("Explain blockchain technology")
With run_agent()
from deepcrew import Agent, run_agent, ObservabilityConfig
obs = ObservabilityConfig(otel_endpoint="http://localhost:4317")
result = await run_agent(
agent,
[{"role": "user", "content": "Hello!"}],
observability=obs, # that's it
)
# Spans emitted:
# deepcrew.agent.run → covers entire agent lifecycle
# deepcrew.llm.call → each LLM request (includes model, agent_id, tokens)
# deepcrew.tool.call → each tool execution (includes tool_name, agent_id)
With WorkflowBuilder
from deepcrew import WorkflowBuilder, ObservabilityConfig
obs = ObservabilityConfig(otel_endpoint="http://localhost:4317", service_name="workflow-app")
workflow = (
WorkflowBuilder(observability=obs) # pass at construction time
.add_agent("step1", agent1, task="{input}")
.add_agent("step2", agent2, task="Refine:\n{step1}")
.then("step1", "step2")
)
result = await workflow.run("My query")
# Spans emitted per step:
# deepcrew.workflow.step → covers each DAG node
# deepcrew.agent.run → the agent within the step
# deepcrew.llm.call
# deepcrew.tool.call
Start Jaeger locally (Docker)
docker run -d --name jaeger \
-e COLLECTOR_OTLP_ENABLED=true \
-p 6831:6831/udp \
-p 16686:16686 \
-p 4317:4317 \
jaegertracing/all-in-one:latest
# Open http://localhost:16686 to view traces
ObservabilityConfig reference
http://localhost:4317. For HTTP: http://localhost:4318/v1/traces."grpc" for port 4317, "http" for port 4318.Span attributes
| Span | Attributes |
|---|---|
deepcrew.agent.run | agent.id, llm.model |
deepcrew.llm.call | agent.id, llm.model, llm.input_tokens, llm.output_tokens |
deepcrew.tool.call | agent.id, tool.name |
deepcrew.workflow.step | step.name |
CLI — deepcrew run
Run declarative YAML workflow files from the terminal. No Python code required for simple workflows.
Installation check
pip install deepcrew-ai
deepcrew --version
# deepcrew-ai 0.2.0
Write a workflow YAML
agents:
- name: researcher
model: openai/gpt-4o-mini
system_prompt: Research the topic thoroughly using all available information.
tools:
- web_search # built-in skill by name
- name: analyst
model: anthropic/claude-haiku-4-5-20251001
system_prompt: Critically analyze the research findings. Identify gaps and strengths.
- name: writer
model: openai/gpt-4o
system_prompt: Write a clear, well-structured executive summary report.
tools:
- summarize # built-in summarize skill
workflow:
- step: research
agent: researcher
task: "{input}"
- step: analysis
agent: analyst
task: |
Analyze this research:
{research}
depends_on:
- research
- step: report
agent: writer
task: |
Write an executive summary based on:
Research: {research}
Analysis: {analysis}
depends_on:
- research
- analysis
Run it
# Stream output to terminal
deepcrew run workflow.yaml --input "The future of autonomous vehicles"
# Non-streaming (prints only final result)
deepcrew run workflow.yaml --input "Quantum computing in 2026" --no-stream
# List all agents in a config
deepcrew agents list --config workflow.yaml
YAML schema
web_search, summarize, execute_code.{step_name} in subsequent task templates.name.{input} is the CLI --input value. {step_name} is the text output of that step.Supported built-in tool names
| YAML name | Skill class | Description |
|---|---|---|
web_search | WebSearchSkill | DuckDuckGo search |
summarize | SummarizeSkill | LLM-backed summarization |
execute_code | CodeExecutionSkill | Python subprocess execution |