Real-world examples
across every domain.
Complete, runnable examples for legal, financial, medical, DevOps, e-commerce, and more — using OpenAI, Anthropic, Gemini, Bedrock, Azure, and local Ollama models.
Provider Quick Reference
deepcrew-ai uses LiteLLM for all LLM calls — pass any provider's model string to Agent(model=...).
import os
# ── OpenAI ──────────────────────────────────────────────────────────────────
os.environ["OPENAI_API_KEY"] = "sk-..."
Agent("a", model="openai/gpt-4o")
Agent("a", model="openai/gpt-4o-mini")
Agent("a", model="openai/o3") # reasoning model
# ── Anthropic ────────────────────────────────────────────────────────────────
os.environ["ANTHROPIC_API_KEY"] = "sk-ant-..."
Agent("a", model="anthropic/claude-opus-4-8")
Agent("a", model="anthropic/claude-sonnet-4-6")
Agent("a", model="anthropic/claude-haiku-4-5-20251001")
# ── Google Gemini ─────────────────────────────────────────────────────────────
os.environ["GEMINI_API_KEY"] = "AIza..."
Agent("a", model="gemini/gemini-2.5-pro")
Agent("a", model="gemini/gemini-2.0-flash")
Agent("a", model="gemini/gemini-2.0-flash-lite")
# ── AWS Bedrock ───────────────────────────────────────────────────────────────
os.environ["AWS_ACCESS_KEY_ID"] = "..."
os.environ["AWS_SECRET_ACCESS_KEY"] = "..."
os.environ["AWS_DEFAULT_REGION"] = "us-east-1"
Agent("a", model="bedrock/anthropic.claude-opus-4-8-20250514-v1:0")
Agent("a", model="bedrock/amazon.nova-pro-v1:0")
Agent("a", model="bedrock/meta.llama3-70b-instruct-v1:0")
# ── Azure OpenAI ──────────────────────────────────────────────────────────────
os.environ["AZURE_API_KEY"] = "..."
os.environ["AZURE_API_BASE"] = "https://YOUR-RESOURCE.openai.azure.com"
os.environ["AZURE_API_VERSION"] = "2024-10-21"
Agent("a", model="azure/gpt-4o")
Agent("a", model="azure/gpt-4o-mini")
# ── Groq (fast inference) ─────────────────────────────────────────────────────
os.environ["GROQ_API_KEY"] = "gsk_..."
Agent("a", model="groq/llama-3.3-70b-versatile")
Agent("a", model="groq/mixtral-8x7b-32768")
# ── Cohere ────────────────────────────────────────────────────────────────────
os.environ["COHERE_API_KEY"] = "..."
Agent("a", model="cohere/command-r-plus")
# ── Mistral ───────────────────────────────────────────────────────────────────
os.environ["MISTRAL_API_KEY"] = "..."
Agent("a", model="mistral/mistral-large-latest")
Agent("a", model="mistral/codestral-latest")
# ── Local via Ollama ──────────────────────────────────────────────────────────
# (no API key needed — run: ollama pull llama3.2)
Agent("a", model="ollama/llama3.2")
Agent("a", model="ollama/qwen2.5-coder:7b")
Agent("a", model="ollama/phi3.5")
Agent("a", model="ollama/mistral")
Provider comparison
| Provider | Best for | Env var |
|---|---|---|
| OpenAI GPT-4o | General tasks, code, reasoning | OPENAI_API_KEY |
| Anthropic Claude | Long context, nuanced writing, safety | ANTHROPIC_API_KEY |
| Google Gemini | Multimodal, long context, speed | GEMINI_API_KEY |
| AWS Bedrock | Regulated industries, no data egress | AWS_* |
| Azure OpenAI | Enterprise compliance, SLA guarantees | AZURE_API_* |
| Groq | Ultra-low latency inference | GROQ_API_KEY |
| Ollama | Fully local, air-gapped, no cost | None |
Multi-Model Research Pipeline
A three-agent research pipeline where each agent uses a different provider, with APEX synthesis and source citation.
"""
Multi-model research pipeline.
Providers: OpenAI (researcher) + Anthropic (analyst) + Gemini (writer) → OpenAI APEX
"""
import asyncio
from deepcrew import Agent, Orchestrator, ApexConfig, tool
from deepcrew import WebSearchSkill, LoopConfig, RetryPolicy
@tool
def search_arxiv(query: str, max_results: int = 5) -> list[dict]:
"""Search arXiv for academic papers.
Args:
query (str): Search query.
max_results (int): Maximum results.
"""
import httpx
resp = httpx.get(
"https://export.arxiv.org/api/query",
params={"search_query": f"all:{query}", "max_results": max_results},
)
# Parse Atom XML (simplified)
import re
titles = re.findall(r"(.*?) ", resp.text)[1:] # skip feed title
summaries = re.findall(r"(.*?) ", resp.text, re.DOTALL)
return [{"title": t.strip(), "summary": s.strip()[:200]} for t, s in zip(titles, summaries)]
@tool
def fetch_wikipedia(topic: str) -> str:
"""Fetch Wikipedia article summary for a topic.
Args:
topic (str): Topic to look up.
"""
import httpx
resp = httpx.get(
"https://en.wikipedia.org/api/rest_v1/page/summary/" + topic.replace(" ", "_")
)
data = resp.json()
return data.get("extract", "Not found.")[:1000]
researcher = Agent(
name="researcher",
model="openai/gpt-4o-mini",
system_prompt="You are an expert researcher. Use all tools to gather comprehensive, accurate information. Prioritize recent sources.",
tools=[search_arxiv, fetch_wikipedia],
skills=[WebSearchSkill()],
retry_policy=RetryPolicy(max_retries=2),
loop_config=LoopConfig(
max_iterations=3,
convergence_fn=lambda r: len(r.text) > 600,
refine_prompt="Your research is thin. Search for more specific papers and sources.",
),
)
analyst = Agent(
name="analyst",
model="anthropic/claude-haiku-4-5-20251001",
system_prompt="You are a critical analyst. Evaluate the research, identify patterns, assess reliability, and highlight key insights and limitations.",
retry_policy=RetryPolicy(max_retries=2),
)
writer = Agent(
name="writer",
model="gemini/gemini-2.0-flash",
system_prompt="You are a science communicator. Write clear, engaging summaries that are accurate and accessible to a general audience.",
)
orch = Orchestrator(
agents=[researcher, analyst, writer],
router_model="openai/gpt-4o-mini",
apex_model="openai/gpt-4o",
apex_config=ApexConfig(cite_sources=True, confidence_threshold=0.75),
max_parallel_agents=3,
)
async def main():
result = await orch.run("What are the latest breakthroughs in room-temperature superconductivity?")
print(result.final_text)
print(f"\nConfidence: {result.agent_results[-1].confidence:.2%}")
print(f"Total tokens: {result.total_tokens:,}")
asyncio.run(main())
Legal Research Assistant
Legal research with Azure OpenAI for enterprise compliance — multi-agent contract review, case law search, and risk assessment.
"""
Legal research assistant using Azure OpenAI.
Azure provides SOC2/HIPAA compliance, data residency, and SLA guarantees.
"""
import asyncio
import os
from deepcrew import Agent, WorkflowBuilder, ObservabilityConfig, tool
os.environ["AZURE_API_KEY"] = "your-azure-key"
os.environ["AZURE_API_BASE"] = "https://your-resource.openai.azure.com"
os.environ["AZURE_API_VERSION"] = "2024-10-21"
@tool
def search_case_law(query: str, jurisdiction: str = "US", max_results: int = 5) -> list[dict]:
"""Search case law database.
Args:
query (str): Legal search query.
jurisdiction (str): Jurisdiction code (US, UK, EU, etc.).
max_results (int): Maximum results to return.
"""
# In production: connect to Westlaw, LexisNexis, or CourtListener API
return [
{"case": f"Sample v. Example, {2020 + i}", "holding": f"Court held that...", "relevance": 0.9 - i * 0.1}
for i in range(min(max_results, 3))
]
@tool
def analyze_contract_clause(clause_text: str, clause_type: str) -> dict:
"""Analyze a contract clause for potential risks.
Args:
clause_text (str): The contract clause text.
clause_type (str): Type of clause (e.g., 'indemnification', 'limitation_of_liability', 'IP_assignment').
"""
return {
"clause_type": clause_type,
"risk_level": "medium",
"issues": ["Overly broad indemnification scope", "Missing liability cap"],
"recommendations": ["Narrow indemnification to third-party claims only", "Add $X liability cap"],
}
obs = ObservabilityConfig(
otel_endpoint="http://your-otel-collector:4317",
service_name="legal-ai",
)
contract_reviewer = Agent(
name="contract_reviewer",
model="azure/gpt-4o",
system_prompt="""You are an expert contract lawyer. Review contracts clause by clause.
Identify risks, ambiguities, and unfavorable terms. Be precise and cite specific language.""",
tools=[analyze_contract_clause],
max_turns=15,
temperature=0.1, # low temperature for consistent legal analysis
)
case_law_researcher = Agent(
name="case_law_researcher",
model="azure/gpt-4o",
system_prompt="""You are a legal researcher specializing in case law.
Find relevant precedents and explain their applicability.""",
tools=[search_case_law],
max_turns=10,
)
risk_assessor = Agent(
name="risk_assessor",
model="azure/gpt-4o",
system_prompt="""You are a legal risk assessment specialist.
Given contract review findings and relevant case law, produce a structured risk report
with severity ratings (High/Medium/Low) and specific recommended mitigations.""",
)
workflow = (
WorkflowBuilder(observability=obs)
.add_agent("review", contract_reviewer, task="Review this contract:\n\n{input}")
.add_agent("precedents", case_law_researcher, task="Find case law relevant to:\n\n{input}")
.add_agent("risk_report", risk_assessor,
task="Based on the contract review and case law, produce a risk report.\n\nReview:\n{review}\n\nCase law:\n{precedents}")
.then("review", "risk_report")
.then("precedents", "risk_report")
)
async def analyze_contract(contract_text: str):
result = await workflow.run(contract_text)
return result.final_output.text
asyncio.run(analyze_contract("""
Section 12. Indemnification. Vendor shall indemnify and hold harmless Company...
"""))
Automated Code Review
Multi-agent code review pipeline: security scanning, performance analysis, and style checking — each with a specialized model.
"""
Automated code review using specialized models per concern.
Claude for security (strong reasoning), Groq for speed on style checks.
"""
import asyncio
from deepcrew import Agent, Orchestrator, ApexConfig, RetryPolicy, FallbackChain, tool
from deepcrew import CodeExecutionSkill
@tool
def run_linter(code: str, language: str = "python") -> dict:
"""Run a static linter on code.
Args:
code (str): Source code to lint.
language (str): Programming language.
"""
import subprocess, tempfile, os
if language != "python":
return {"issues": [], "language": language}
with tempfile.NamedTemporaryFile(suffix=".py", mode="w", delete=False) as f:
f.write(code)
fname = f.name
try:
result = subprocess.run(
["python", "-m", "py_compile", fname],
capture_output=True, text=True, timeout=10
)
return {"syntax_ok": result.returncode == 0, "errors": result.stderr}
finally:
os.unlink(fname)
@tool
def check_dependencies(requirements_txt: str) -> list[dict]:
"""Check dependencies for known vulnerabilities.
Args:
requirements_txt (str): Content of requirements.txt
"""
# In production: use pip-audit or safety API
lines = [l.strip() for l in requirements_txt.splitlines() if l.strip()]
return [{"package": line, "status": "ok", "vulnerabilities": []} for line in lines]
security_auditor = Agent(
name="security_auditor",
model="anthropic/claude-opus-4-8", # Claude has strong security reasoning
system_prompt="""You are a senior security engineer. Audit code for:
- Injection vulnerabilities (SQL, command, LDAP)
- Authentication/authorization flaws
- Insecure data handling (secrets, PII)
- Dependency vulnerabilities
- Cryptographic weaknesses
Be specific: cite line numbers and provide remediation code.""",
tools=[check_dependencies, run_linter],
retry_policy=RetryPolicy(max_retries=2),
fallback_chain=FallbackChain(["anthropic/claude-haiku-4-5-20251001"]),
temperature=0.1,
)
performance_analyst = Agent(
name="performance_analyst",
model="openai/gpt-4o",
system_prompt="""You are a performance engineering expert. Identify:
- O(n²) or worse algorithmic complexity
- Unnecessary I/O in loops
- Missing caching opportunities
- Memory leaks or excessive allocations
- Blocking calls in async contexts
Provide Big-O analysis and optimization suggestions.""",
skills=[CodeExecutionSkill(timeout=10.0)], # can run benchmarks
temperature=0.2,
)
style_checker = Agent(
name="style_checker",
model="groq/llama-3.3-70b-versatile", # Groq is very fast for style checks
system_prompt="""You are a code quality reviewer. Check for:
- Naming conventions and clarity
- Function/class size and single responsibility
- Documentation completeness
- Test coverage gaps
- Code duplication
Be constructive and provide specific improvement suggestions.""",
temperature=0.3,
)
orch = Orchestrator(
agents=[security_auditor, performance_analyst, style_checker],
router_model="openai/gpt-4o-mini",
apex_model="openai/gpt-4o",
apex_config=ApexConfig(
cite_sources=True,
confidence_threshold=0.8,
),
)
async def review_code(code: str) -> str:
result = await orch.run(f"Review this code:\n\n```python\n{code}\n```")
return result.final_text
CODE_TO_REVIEW = """
import sqlite3
import os
def get_user(username):
conn = sqlite3.connect('users.db')
cursor = conn.cursor()
query = f"SELECT * FROM users WHERE username = '{username}'"
cursor.execute(query)
return cursor.fetchall()
def process_data(items):
results = []
for item in items:
for other in items: # O(n^2)
if item['id'] != other['id']:
results.append(item['value'] * other['value'])
return results
"""
asyncio.run(review_code(CODE_TO_REVIEW))
Financial Analysis Pipeline
Financial data analysis running entirely on AWS Bedrock — no data leaves your VPC. Covers earnings analysis, risk assessment, and investment thesis generation.
"""
Financial analysis on AWS Bedrock — data stays in your VPC.
Requires: AWS credentials with Bedrock model access.
"""
import asyncio
import os
from deepcrew import Agent, WorkflowBuilder, tool, RetryPolicy, InMemoryProvider
os.environ["AWS_DEFAULT_REGION"] = "us-east-1"
@tool
def fetch_financials(ticker: str, period: str = "annual") -> dict:
"""Fetch financial statements for a stock ticker.
Args:
ticker (str): Stock ticker symbol (e.g., 'AAPL', 'MSFT').
period (str): 'annual' or 'quarterly'.
"""
# In production: use yfinance, Alpha Vantage, or SEC EDGAR API
return {
"ticker": ticker,
"revenue": [100_000_000, 115_000_000, 132_000_000],
"net_income": [15_000_000, 18_000_000, 22_000_000],
"eps": [2.50, 3.10, 3.75],
"debt_to_equity": 0.45,
"free_cash_flow": 28_000_000,
}
@tool
def get_market_data(ticker: str) -> dict:
"""Get current market data and valuation metrics.
Args:
ticker (str): Stock ticker symbol.
"""
return {
"price": 185.50,
"pe_ratio": 28.5,
"ps_ratio": 7.2,
"52w_high": 201.00,
"52w_low": 142.00,
"market_cap": 2_900_000_000_000,
}
@tool
def fetch_news_sentiment(ticker: str, days: int = 30) -> dict:
"""Fetch recent news sentiment for a stock.
Args:
ticker (str): Stock ticker symbol.
days (int): Number of days of history.
"""
return {
"overall_sentiment": "positive",
"sentiment_score": 0.72,
"positive_articles": 45,
"negative_articles": 12,
"key_topics": ["AI products", "strong earnings", "market expansion"],
}
memory = InMemoryProvider()
fundamental_analyst = Agent(
name="fundamental_analyst",
model="bedrock/anthropic.claude-opus-4-8-20250514-v1:0",
system_prompt="""You are a CFA-level fundamental analyst.
Analyze revenue growth, margins, cash generation, and balance sheet health.
Compute key ratios and compare to industry averages.
Identify earnings quality issues or accounting concerns.""",
tools=[fetch_financials, get_market_data],
memory=memory,
retry_policy=RetryPolicy(max_retries=3, backoff_seconds=2.0),
)
sentiment_analyst = Agent(
name="sentiment_analyst",
model="bedrock/amazon.nova-pro-v1:0",
system_prompt="""You are a market sentiment specialist.
Analyze news flow, social sentiment, and management guidance.
Identify catalysts (positive and negative) and their probability-weighted impact.""",
tools=[fetch_news_sentiment],
)
risk_manager = Agent(
name="risk_manager",
model="bedrock/meta.llama3-70b-instruct-v1:0",
system_prompt="""You are a portfolio risk manager.
Given fundamental and sentiment analysis, assess:
- Valuation risk (over/undervalued by how much?)
- Business model risks (competition, regulation, technology)
- Macro risks (rates, FX, recession sensitivity)
- Position sizing recommendation (% of portfolio)""",
)
investment_thesis_writer = Agent(
name="thesis_writer",
model="bedrock/anthropic.claude-opus-4-8-20250514-v1:0",
system_prompt="""You are an investment strategist writing for a hedge fund IC memo.
Write a structured investment thesis with: Executive Summary, Bull/Bear cases,
Price Target (1-year), Catalysts, Key Risks, and a clear Buy/Hold/Sell recommendation.""",
)
workflow = (
WorkflowBuilder()
.add_agent("fundamentals", fundamental_analyst, task="Analyze the fundamentals of {input}")
.add_agent("sentiment", sentiment_analyst, task="Analyze market sentiment for {input}")
.add_agent("risk", risk_manager,
task="Assess risks for {input}.\n\nFundamentals:\n{fundamentals}\n\nSentiment:\n{sentiment}")
.add_agent("thesis", investment_thesis_writer,
task="Write an investment thesis for {input}.\n\nFundamentals:\n{fundamentals}\n\nSentiment:\n{sentiment}\n\nRisk assessment:\n{risk}")
.then("fundamentals", "risk")
.then("sentiment", "risk")
.then("fundamentals", "thesis")
.then("sentiment", "thesis")
.then("risk", "thesis")
)
async def main():
result = await workflow.run("NVDA") # Analyze Nvidia
print(result.final_output.text)
asyncio.run(main())
Customer Support Automation
High-volume customer support triage and response using Gemini Flash for low-latency classification, with Anthropic Claude handling complex escalations.
"""
Customer support pipeline:
- Gemini Flash for fast triage/classification (low latency, high volume)
- Claude for complex, empathetic escalation responses
- Memory to track customer history
"""
import asyncio
from deepcrew import (
Agent, Orchestrator, ApexConfig, tool,
FileMemoryProvider, RetryPolicy, FallbackChain, LoopConfig,
)
@tool
def lookup_order(order_id: str) -> dict:
"""Look up order status and details.
Args:
order_id (str): The order ID (format: ORD-XXXXX).
"""
return {
"order_id": order_id,
"status": "shipped",
"shipped_date": "2026-06-25",
"estimated_delivery": "2026-07-01",
"items": [{"name": "Widget Pro", "qty": 2, "price": 49.99}],
"carrier": "FedEx",
"tracking": "1234567890",
}
@tool
def lookup_customer(email: str) -> dict:
"""Look up customer account and history.
Args:
email (str): Customer email address.
"""
return {
"email": email,
"name": "Jane Smith",
"tier": "premium",
"lifetime_orders": 47,
"account_age_days": 820,
"open_tickets": 0,
}
@tool
def create_refund(order_id: str, reason: str, amount: float | None = None) -> dict:
"""Create a refund for an order.
Args:
order_id (str): Order to refund.
reason (str): Refund reason.
amount (float): Amount to refund (None = full refund).
"""
return {"refund_id": f"REF-{order_id}", "status": "approved", "amount": amount or "full"}
customer_memory = FileMemoryProvider("~/.deepcrew/support_memory.json")
triage_agent = Agent(
name="triage",
model="gemini/gemini-2.0-flash-lite", # fastest/cheapest for classification
system_prompt="""You are a customer support triage agent.
Classify the ticket and extract key information:
- Category: billing/shipping/technical/returns/general
- Urgency: low/medium/high/critical
- Customer sentiment: positive/neutral/frustrated/angry
- Key entities: order IDs, product names, dates""",
memory=customer_memory,
)
resolver_agent = Agent(
name="resolver",
model="gemini/gemini-2.0-flash",
system_prompt="""You are a customer support specialist.
Given the triage, look up relevant information and resolve the issue.
Be empathetic, clear, and provide specific action items with timelines.""",
tools=[lookup_order, lookup_customer, create_refund],
retry_policy=RetryPolicy(max_retries=2),
fallback_chain=FallbackChain(["anthropic/claude-haiku-4-5-20251001"]),
)
escalation_agent = Agent(
name="escalation",
model="anthropic/claude-haiku-4-5-20251001", # Claude for empathetic escalations
system_prompt="""You are a senior customer support specialist handling escalations.
Acknowledge the customer's frustration, take ownership, and provide a clear resolution path.
For complex cases, offer goodwill gestures appropriate to the customer's tier.""",
tools=[lookup_order, lookup_customer, create_refund],
loop_config=LoopConfig(
max_iterations=2,
convergence_fn=lambda r: "resolution" in r.text.lower() or "refund" in r.text.lower(),
refine_prompt="Ensure your response includes a specific resolution and timeline.",
),
)
orch = Orchestrator(
agents=[triage_agent, resolver_agent, escalation_agent],
router_model="gemini/gemini-2.0-flash-lite",
apex_model="gemini/gemini-2.0-flash",
apex_config=ApexConfig(cite_sources=False),
)
async def handle_ticket(customer_email: str, message: str) -> str:
context = {"customer_email": customer_email}
result = await orch.run(
f"Customer ({customer_email}): {message}",
context=context,
)
return result.final_text
asyncio.run(handle_ticket(
"jane@example.com",
"My order ORD-12345 was supposed to arrive yesterday but tracking shows it's still in transit. I need this for an event tomorrow!"
))
Medical Literature Review
Systematic literature review with PubMed integration, evidence grading, and structured clinical summary — using Claude for its strong long-context reasoning.
"""
Medical literature review using Anthropic Claude.
Claude excels at long-context reasoning and nuanced medical language.
DISCLAIMER: For research/educational use only. Not medical advice.
"""
import asyncio
from deepcrew import Agent, WorkflowBuilder, tool, LoopConfig, RetryPolicy
@tool
def search_pubmed(query: str, max_results: int = 10) -> list[dict]:
"""Search PubMed for medical literature.
Args:
query (str): Medical search query (supports MeSH terms).
max_results (int): Maximum number of results.
"""
import httpx
# Step 1: search
search_resp = httpx.get("https://eutils.ncbi.nlm.nih.gov/entrez/eutils/esearch.fcgi", params={
"db": "pubmed", "term": query, "retmax": max_results, "retmode": "json"
}).json()
ids = search_resp.get("esearchresult", {}).get("idlist", [])
if not ids:
return []
# Step 2: fetch summaries
summary_resp = httpx.get("https://eutils.ncbi.nlm.nih.gov/entrez/eutils/esummary.fcgi", params={
"db": "pubmed", "id": ",".join(ids), "retmode": "json"
}).json()
results = []
for uid in ids:
art = summary_resp.get("result", {}).get(uid, {})
results.append({
"pmid": uid,
"title": art.get("title", ""),
"journal": art.get("source", ""),
"year": art.get("pubdate", "")[:4],
"authors": art.get("authors", [{}])[0].get("name", "") + " et al." if art.get("authors") else "",
})
return results
@tool
def fetch_abstract(pmid: str) -> str:
"""Fetch the abstract for a PubMed article.
Args:
pmid (str): PubMed ID.
"""
import httpx
resp = httpx.get("https://eutils.ncbi.nlm.nih.gov/entrez/eutils/efetch.fcgi", params={
"db": "pubmed", "id": pmid, "rettype": "abstract", "retmode": "text"
})
return resp.text[:2000]
literature_searcher = Agent(
name="literature_searcher",
model="anthropic/claude-haiku-4-5-20251001", # fast + good for search queries
system_prompt="""You are a medical librarian. Search PubMed systematically using
multiple relevant MeSH terms and synonyms. Fetch abstracts for the most relevant papers.
Focus on: RCTs, systematic reviews, and meta-analyses published in last 5 years.""",
tools=[search_pubmed, fetch_abstract],
loop_config=LoopConfig(
max_iterations=3,
convergence_fn=lambda r: r.text.count("PMID") >= 5,
refine_prompt="You need more papers. Try different MeSH terms and synonyms. Aim for at least 8 relevant papers.",
),
retry_policy=RetryPolicy(max_retries=2),
)
evidence_grader = Agent(
name="evidence_grader",
model="anthropic/claude-sonnet-4-6",
system_prompt="""You are a clinical epidemiologist applying GRADE methodology.
For each paper, assess:
- Study design (RCT > cohort > case-control > expert opinion)
- Risk of bias (randomization, blinding, attrition)
- Consistency across studies
- Directness to the clinical question
- Precision (confidence intervals, sample size)
Grade overall evidence: High / Moderate / Low / Very Low.""",
)
clinical_summarizer = Agent(
name="clinical_summarizer",
model="anthropic/claude-opus-4-8", # strongest for nuanced medical synthesis
system_prompt="""You are a clinical researcher writing for a peer-reviewed journal.
Synthesize evidence into a structured review with:
- Background and clinical question
- Summary of evidence (organized by intervention/outcome)
- Evidence quality (GRADE ratings)
- Clinical implications and practice recommendations
- Limitations and knowledge gaps
- References (PMID citations)
Be precise with statistics (NNT, OR, HR, 95% CI).""",
max_tokens=4096,
)
workflow = (
WorkflowBuilder()
.add_agent("search", literature_searcher, task="{input}")
.add_agent("grade", evidence_grader, task="Grade evidence from:\n{search}")
.add_agent("synthesis", clinical_summarizer,
task="Write a clinical evidence summary for: {input}\n\nLiterature:\n{search}\n\nEvidence grades:\n{grade}")
.then("search", "grade")
.then("search", "synthesis")
.then("grade", "synthesis")
)
async def main():
result = await workflow.run(
"What is the efficacy of GLP-1 receptor agonists for non-alcoholic fatty liver disease?"
)
print(result.final_output.text)
asyncio.run(main())
E-Commerce Intelligence Platform
Product catalog enrichment, pricing optimization, and review analysis — with multi-provider fallback for high availability.
"""
E-commerce intelligence: product enrichment + pricing + sentiment analysis.
Uses multi-provider fallback for 99.9% uptime.
"""
import asyncio
from deepcrew import (
Agent, Orchestrator, ApexConfig, tool,
RetryPolicy, FallbackChain, InMemoryProvider, WebSearchSkill,
)
@tool
def get_product_data(product_id: str) -> dict:
"""Fetch product data from catalog.
Args:
product_id (str): Product SKU or ID.
"""
return {
"id": product_id,
"name": "Premium Wireless Headphones",
"category": "Electronics > Audio",
"price": 149.99,
"cost": 52.00,
"inventory": 234,
"rating": 4.3,
"review_count": 1847,
}
@tool
def get_competitor_prices(product_name: str) -> list[dict]:
"""Scrape competitor prices for a product.
Args:
product_name (str): Product name to search.
"""
return [
{"retailer": "Amazon", "price": 139.99, "in_stock": True},
{"retailer": "Best Buy", "price": 159.99, "in_stock": True},
{"retailer": "Walmart", "price": 134.99, "in_stock": False},
]
@tool
def fetch_reviews(product_id: str, count: int = 50) -> list[dict]:
"""Fetch customer reviews for a product.
Args:
product_id (str): Product ID.
count (int): Number of reviews to fetch.
"""
return [
{"rating": 5, "text": "Amazing sound quality, very comfortable.", "helpful": 45},
{"rating": 2, "text": "Battery life is disappointing, disconnects often.", "helpful": 32},
{"rating": 4, "text": "Great for the price, noise cancellation works well.", "helpful": 28},
]
# Fallback chain ensures high availability even if primary model is down
fallback = FallbackChain(["anthropic/claude-haiku-4-5-20251001", "gemini/gemini-2.0-flash"])
content_enricher = Agent(
name="content_enricher",
model="openai/gpt-4o",
system_prompt="""You are an e-commerce content specialist.
Enrich product listings with:
- SEO-optimized title and description
- Key features bullet points (5-7)
- Target customer persona
- Use case scenarios
- Cross-sell/upsell recommendations""",
tools=[get_product_data],
skills=[WebSearchSkill()],
retry_policy=RetryPolicy(max_retries=3),
fallback_chain=fallback,
)
pricing_optimizer = Agent(
name="pricing_optimizer",
model="openai/gpt-4o-mini",
system_prompt="""You are a pricing strategist.
Analyze competitor prices and our margins to recommend optimal pricing:
- Price point relative to competitors
- Expected demand elasticity impact
- Margin impact calculation
- Bundle pricing opportunities
- Dynamic pricing triggers (inventory level, seasonality)""",
tools=[get_product_data, get_competitor_prices],
retry_policy=RetryPolicy(max_retries=3),
fallback_chain=fallback,
)
review_analyst = Agent(
name="review_analyst",
model="gemini/gemini-2.0-flash",
system_prompt="""You are a customer insights analyst.
Analyze reviews to extract:
- Sentiment distribution and trend
- Top praise themes (what customers love)
- Top complaint themes (what to improve)
- Feature requests
- Competitor comparisons mentioned
- NPS-style scoring""",
tools=[fetch_reviews],
retry_policy=RetryPolicy(max_retries=3),
fallback_chain=FallbackChain(["openai/gpt-4o-mini"]),
)
orch = Orchestrator(
agents=[content_enricher, pricing_optimizer, review_analyst],
router_model="openai/gpt-4o-mini",
apex_model="openai/gpt-4o",
apex_config=ApexConfig(cite_sources=True, confidence_threshold=0.75),
)
async def enrich_product(product_id: str) -> str:
result = await orch.run(f"Provide full intelligence report for product {product_id}")
return result.final_text
asyncio.run(enrich_product("PROD-WH-001"))
DevOps Incident Response
AI-powered incident response: log analysis, root cause identification, and automated remediation — with full OpenTelemetry tracing and agent spawning for parallel diagnosis.
"""
DevOps incident response with agent spawning and OTel tracing.
Parent agent spawns specialized sub-agents for parallel root cause analysis.
"""
import asyncio
from deepcrew import (
Agent, Orchestrator, tool, ObservabilityConfig,
RetryPolicy, FallbackChain, spawn_agent, SpawnRequest,
)
obs = ObservabilityConfig(
otel_endpoint="http://otel-collector.internal:4317",
service_name="incident-response-ai",
)
@tool
def get_error_logs(service: str, minutes: int = 30, log_level: str = "ERROR") -> list[str]:
"""Fetch recent error logs from a service.
Args:
service (str): Service name (e.g., 'api-gateway', 'payment-service').
minutes (int): How many minutes of logs to fetch.
log_level (str): Log level filter: DEBUG/INFO/WARN/ERROR/CRITICAL.
"""
# In production: connect to CloudWatch, Datadog, or ELK
return [
f"[ERROR] {service}: Connection pool exhausted (pool_size=10, waiting=47)",
f"[ERROR] {service}: Timeout after 30s connecting to postgres-replica-2",
f"[CRITICAL] {service}: Circuit breaker OPEN for downstream payment-processor",
]
@tool
def get_metrics(service: str, metric: str, minutes: int = 30) -> dict:
"""Fetch service metrics from monitoring system.
Args:
service (str): Service name.
metric (str): Metric name (e.g., 'error_rate', 'p99_latency', 'cpu', 'memory').
minutes (int): Time window in minutes.
"""
return {
"service": service,
"metric": metric,
"current": 94.2 if metric == "error_rate" else 3450,
"baseline": 0.5 if metric == "error_rate" else 120,
"threshold": 5.0 if metric == "error_rate" else 2000,
"status": "critical",
}
@tool
def run_diagnostic_command(command: str) -> str:
"""Run a safe read-only diagnostic command.
Args:
command (str): Diagnostic command (kubectl describe, pg_stat_activity, etc.).
"""
import subprocess
# Whitelist only safe read-only commands
SAFE_PREFIXES = ["kubectl get", "kubectl describe", "kubectl logs", "pg_stat", "df ", "free "]
if not any(command.startswith(p) for p in SAFE_PREFIXES):
return f"Command blocked for safety: {command}"
try:
result = subprocess.run(command.split(), capture_output=True, text=True, timeout=10)
return result.stdout[:2000] or result.stderr[:500]
except Exception as e:
return str(e)
@tool
def create_incident_ticket(title: str, severity: str, description: str, assigned_team: str) -> dict:
"""Create an incident ticket in the ticketing system.
Args:
title (str): Incident title.
severity (str): P1/P2/P3/P4.
description (str): Detailed description.
assigned_team (str): Team to assign (sre, backend, database, network).
"""
return {"ticket_id": "INC-2847", "url": "https://incidents.example.com/INC-2847", "created": True}
# Main incident commander with spawning enabled
incident_commander = Agent(
name="incident_commander",
model="openai/gpt-4o",
system_prompt="""You are an SRE incident commander. Your job is to:
1. Quickly assess the scope and severity of the incident
2. Spawn specialized diagnostic agents for parallel root cause analysis
3. Coordinate findings into a clear timeline and root cause
4. Recommend immediate mitigations and long-term fixes
5. Create the incident ticket with full details
Use the spawn_agent tool to run parallel diagnostic investigations.""",
tools=[get_error_logs, get_metrics, create_incident_ticket],
retry_policy=RetryPolicy(max_retries=3),
fallback_chain=FallbackChain(["anthropic/claude-sonnet-4-6"]),
)
orch = Orchestrator(
agents=[incident_commander],
router_model="openai/gpt-4o-mini",
apex_model="openai/gpt-4o",
global_tools=[get_error_logs, get_metrics, run_diagnostic_command],
enable_spawn=True, # commander can spawn specialized sub-agents
)
async def respond_to_incident(alert: str) -> str:
result = await orch.run(alert)
return result.final_text
asyncio.run(respond_to_incident(
"CRITICAL ALERT: payment-service error rate 94.2% (threshold: 5%). "
"P99 latency 3450ms (baseline: 120ms). Started 15 minutes ago."
))
Content Marketing Pipeline
Draft → critique → refine content pipeline using looping for quality convergence, memory for brand consistency, and built-in skills for research and summarization.
"""
Content marketing pipeline:
- Looping for draft → critique → refine quality control
- FileMemoryProvider for brand voice consistency across runs
- Skills: WebSearch + Summarize for research-backed content
"""
import asyncio
from deepcrew import (
Agent, WorkflowBuilder, tool,
WebSearchSkill, SummarizeSkill,
LoopConfig, FileMemoryProvider,
RetryPolicy,
)
# Persist brand guidelines and past content across runs
brand_memory = FileMemoryProvider("~/.deepcrew/brand_memory.json")
@tool
def get_seo_keywords(topic: str, count: int = 10) -> list[dict]:
"""Get SEO keyword suggestions for a topic.
Args:
topic (str): Content topic.
count (int): Number of keyword suggestions.
"""
# In production: use Google Keyword Planner, Ahrefs, or SEMrush API
return [
{"keyword": f"{topic} guide", "volume": 12000, "difficulty": 42},
{"keyword": f"best {topic}", "volume": 8500, "difficulty": 55},
{"keyword": f"how to {topic}", "volume": 6200, "difficulty": 38},
]
@tool
def check_readability(text: str) -> dict:
"""Check content readability metrics.
Args:
text (str): Text to analyze.
"""
words = text.split()
sentences = text.split(".")
avg_words = len(words) / max(len(sentences), 1)
return {
"word_count": len(words),
"sentence_count": len(sentences),
"avg_words_per_sentence": round(avg_words, 1),
"estimated_flesch_grade": min(12, max(6, avg_words * 0.3)),
"reading_time_minutes": round(len(words) / 200, 1),
}
researcher = Agent(
name="researcher",
model="openai/gpt-4o-mini",
system_prompt="""You are a content researcher. Gather comprehensive, accurate information
on the topic using web search. Find statistics, expert quotes, and relevant examples.
Identify the top SEO keywords to target.""",
tools=[get_seo_keywords],
skills=[WebSearchSkill(), SummarizeSkill(model="openai/gpt-4o-mini")],
memory=brand_memory,
)
writer = Agent(
name="writer",
model="anthropic/claude-haiku-4-5-20251001",
system_prompt="""You are a content marketing writer. Write engaging, informative content
that: is SEO-optimized for the target keywords, follows the brand voice (check memory),
uses data and examples from the research, includes clear headers (H2, H3),
ends with a compelling CTA, and is free of fluff.""",
memory=brand_memory,
loop_config=LoopConfig(
max_iterations=3,
convergence_fn=lambda r: (
len(r.text.split()) > 600 and
"##" in r.text and
r.text.endswith(("action", "today", "now", "started", "free", "learn more"))
),
refine_prompt="""Your draft needs improvement. Ensure:
1. At least 700 words
2. Multiple H2/H3 headers for structure
3. At least 2 statistics or data points
4. A clear CTA in the final paragraph
Revise accordingly.""",
),
retry_policy=RetryPolicy(max_retries=2),
)
editor = Agent(
name="editor",
model="openai/gpt-4o",
system_prompt="""You are a senior content editor. Review drafts for:
- Factual accuracy (flag unsupported claims)
- SEO optimization (keyword density, meta description suggestion)
- Brand voice consistency
- Readability (suggest simplifications for complex sentences)
- Engagement (opening hook, flow, punchiness)
Provide specific line edits and an overall quality score (1-10).""",
tools=[check_readability],
)
social_media_writer = Agent(
name="social_writer",
model="gemini/gemini-2.0-flash",
system_prompt="""You are a social media expert. Create platform-specific posts from the article:
- Twitter/X: 3 tweet thread (280 chars each)
- LinkedIn: Professional long-form post with hashtags
- Instagram caption: Engaging, emoji-rich, with hashtags
Each should standalone while teasing the full article.""",
)
workflow = (
WorkflowBuilder()
.add_agent("research", researcher, task="Research topic for content: {input}")
.add_agent("draft", writer, task="Write a blog post about:\n\n{input}\n\nResearch:\n{research}")
.add_agent("edit", editor, task="Edit this draft:\n\n{draft}")
.add_agent("social", social_media_writer, task="Create social posts from:\n\n{draft}")
.then("research", "draft")
.then("draft", "edit")
.then("draft", "social")
)
async def create_content(topic: str):
result = await workflow.run(topic)
print("=== EDITED ARTICLE ===")
print(result.outputs["edit"].text)
print("\n=== SOCIAL MEDIA POSTS ===")
print(result.outputs["social"].text)
asyncio.run(create_content("10 Python async programming patterns every developer should know"))
Fully Local AI — No Cloud Required
Run a complete multi-agent pipeline with Ollama models on your local machine. Zero cost, complete data privacy, works air-gapped.
# Install Ollama (https://ollama.com)
# Then pull the models you want to use:
ollama pull llama3.2 # general-purpose, 3B params
ollama pull qwen2.5-coder:7b # code-optimized, 7B params
ollama pull phi3.5 # Microsoft's small but capable model
ollama pull mistral # great for analysis
"""
Fully local multi-agent pipeline using Ollama.
No API keys, no cloud calls, works offline.
Prerequisites: Ollama running at http://localhost:11434
"""
import asyncio
from deepcrew import Agent, Orchestrator, ApexConfig, tool, LoopConfig
from deepcrew import CodeExecutionSkill
@tool
def read_file(path: str) -> str:
"""Read a local file.
Args:
path (str): File path to read.
"""
try:
with open(path, "r", encoding="utf-8") as f:
return f.read()[:5000]
except FileNotFoundError:
return f"File not found: {path}"
@tool
def list_files(directory: str, extension: str = "") -> list[str]:
"""List files in a directory.
Args:
directory (str): Directory path.
extension (str): File extension filter (e.g., '.py', '.md').
"""
import os
try:
files = os.listdir(directory)
if extension:
files = [f for f in files if f.endswith(extension)]
return files[:50]
except Exception as e:
return [str(e)]
# General-purpose reasoning agent
analyst = Agent(
name="analyst",
model="ollama/llama3.2",
system_prompt="You are a helpful analyst. Reason carefully and provide well-structured answers.",
tools=[read_file, list_files],
)
# Code-specialized agent
coder = Agent(
name="coder",
model="ollama/qwen2.5-coder:7b",
system_prompt="You are an expert programmer. Write clean, efficient, well-documented code.",
tools=[read_file, list_files],
skills=[CodeExecutionSkill(timeout=15.0)],
loop_config=LoopConfig(
max_iterations=3,
convergence_fn=lambda r: "def " in r.text or "class " in r.text,
refine_prompt="Your response doesn't include actual code. Please write the implementation.",
),
)
# Writing agent
writer = Agent(
name="writer",
model="ollama/phi3.5",
system_prompt="You are a technical writer. Write clear, concise documentation.",
)
orch = Orchestrator(
agents=[analyst, coder, writer],
router_model="ollama/llama3.2", # local router too
apex_model="ollama/mistral", # local synthesis
apex_config=ApexConfig(cite_sources=False), # simpler for local models
)
async def main():
# Local code analysis
result = await orch.run(
"Analyze the current directory structure, identify Python files, "
"and suggest how to add type hints to improve code quality."
)
print(result.final_text)
asyncio.run(main())
Local-only workflow.yaml
agents:
- name: researcher
model: ollama/llama3.2
system_prompt: Research and gather information thoroughly.
tools: [web_search]
- name: writer
model: ollama/phi3.5
system_prompt: Write clear and well-structured content.
workflow:
- step: research
agent: researcher
task: "{input}"
- step: article
agent: writer
task: "Write about:\n{research}"
depends_on: [research]
deepcrew run local_workflow.yaml --input "Explain how neural networks learn"
Real-Time Streaming API
Complete FastAPI backend with Server-Sent Events streaming, plus a React hook for consuming the stream in the frontend.
"""
FastAPI streaming server with deepcrew-ai.
Streams events to the browser as SSE.
"""
import asyncio
from fastapi import FastAPI, HTTPException
from fastapi.responses import StreamingResponse
from fastapi.middleware.cors import CORSMiddleware
from pydantic import BaseModel
from deepcrew import Agent, Orchestrator, ApexConfig, tool, ObservabilityConfig
from deepcrew import RetryPolicy, FallbackChain, WebSearchSkill
app = FastAPI(title="deepcrew AI API", version="0.2.0")
app.add_middleware(
CORSMiddleware,
allow_origins=["http://localhost:3000"],
allow_methods=["*"],
allow_headers=["*"],
)
obs = ObservabilityConfig(
otel_endpoint="http://localhost:4317",
service_name="deepcrew-api",
)
@tool
def search_web(query: str) -> str:
"Search the web for current information."
import httpx
r = httpx.get(
f"https://api.duckduckgo.com/?q={query}&format=json&no_redirect=1"
)
data = r.json()
return data.get("AbstractText") or ", ".join(
r.get("Text", "") for r in data.get("RelatedTopics", [])[:3]
) or "No results found."
def build_orchestrator() -> Orchestrator:
agents = [
Agent(
name="researcher",
model="openai/gpt-4o-mini",
system_prompt="Research the topic thoroughly.",
tools=[search_web],
skills=[WebSearchSkill()],
retry_policy=RetryPolicy(max_retries=2),
fallback_chain=FallbackChain(["anthropic/claude-haiku-4-5-20251001"]),
),
Agent(
name="analyst",
model="anthropic/claude-haiku-4-5-20251001",
system_prompt="Analyze and provide critical insights.",
retry_policy=RetryPolicy(max_retries=2),
fallback_chain=FallbackChain(["gemini/gemini-2.0-flash"]),
),
]
return Orchestrator(
agents=agents,
router_model="openai/gpt-4o-mini",
apex_model="openai/gpt-4o",
apex_config=ApexConfig(cite_sources=True, confidence_threshold=0.75),
)
orch = build_orchestrator()
class QueryRequest(BaseModel):
query: str
stream: bool = True
@app.post("/v1/query")
async def query_endpoint(req: QueryRequest):
if not req.query.strip():
raise HTTPException(status_code=400, detail="Query cannot be empty")
if req.stream:
async def event_generator():
try:
async for event in orch.stream(req.query):
yield event.to_sse()
except Exception as e:
import json
yield f"event: error\ndata: {json.dumps({'message': str(e)})}\n\n"
return StreamingResponse(
event_generator(),
media_type="text/event-stream",
headers={
"Cache-Control": "no-cache",
"X-Accel-Buffering": "no",
"Connection": "keep-alive",
},
)
else:
result = await orch.run(req.query)
return {
"text": result.final_text,
"confidence": result.agent_results[-1].confidence if result.agent_results else None,
"total_tokens": result.total_tokens,
}
@app.get("/health")
async def health():
return {"status": "ok", "version": "0.2.0"}
// React hook for consuming deepcrew SSE streams
import { useState, useCallback, useRef } from "react";
interface DeepCrewEvent {
event: string;
agent_id: string;
data: Record<string, unknown>;
}
interface UseDeepCrewOptions {
apiUrl?: string;
}
export function useDeepCrew({ apiUrl = "http://localhost:8000" }: UseDeepCrewOptions = {}) {
const [text, setText] = useState("");
const [confidence, setConfidence] = useState<number | null>(null);
const [isStreaming, setIsStreaming] = useState(false);
const [error, setError] = useState<string | null>(null);
const [events, setEvents] = useState<DeepCrewEvent[]>([]);
const abortRef = useRef<AbortController | null>(null);
const query = useCallback(async (q: string) => {
abortRef.current?.abort();
abortRef.current = new AbortController();
setText(""); setConfidence(null); setError(null); setEvents([]);
setIsStreaming(true);
try {
const res = await fetch(`${apiUrl}/v1/query`, {
method: "POST",
headers: { "Content-Type": "application/json" },
body: JSON.stringify({ query: q, stream: true }),
signal: abortRef.current.signal,
});
if (!res.ok) throw new Error(`HTTP ${res.status}`);
const reader = res.body!.getReader();
const decoder = new TextDecoder();
let buffer = "";
while (true) {
const { done, value } = await reader.read();
if (done) break;
buffer += decoder.decode(value, { stream: true });
const lines = buffer.split("\n\n");
buffer = lines.pop() ?? "";
for (const block of lines) {
const eventLine = block.match(/^event: (.+)$/m)?.[1];
const dataLine = block.match(/^data: (.+)$/m)?.[1];
if (!dataLine) continue;
const data = JSON.parse(dataLine) as DeepCrewEvent["data"];
const ev: DeepCrewEvent = { event: eventLine ?? "message", agent_id: data.agent_id as string ?? "", data };
setEvents(prev => [...prev, ev]);
if (ev.event === "text_delta") {
setText(prev => prev + (data.chunk as string ?? ""));
} else if (ev.event === "apex_done") {
setConfidence(data.confidence as number ?? null);
} else if (ev.event === "error") {
setError(data.message as string ?? "Unknown error");
}
}
}
} catch (e: unknown) {
if ((e as Error).name !== "AbortError") setError((e as Error).message);
} finally {
setIsStreaming(false);
}
}, [apiUrl]);
const stop = useCallback(() => abortRef.current?.abort(), []);
return { text, confidence, isStreaming, error, events, query, stop };
}
import React, { useState } from "react";
import { useDeepCrew } from "./useDeepCrew";
export default function ChatUI() {
const [input, setInput] = useState("");
const { text, confidence, isStreaming, error, query, stop } = useDeepCrew();
const handleSubmit = (e: React.FormEvent) => {
e.preventDefault();
if (input.trim()) query(input);
};
return (
<div className="chat-container">
<div className="response-area">
{text && <p className="response-text">{text}</p>}
{confidence !== null && (
<div className="confidence-badge">
Confidence: {(confidence * 100).toFixed(0)}%
<div className="confidence-bar" style={{ width: `${confidence * 100}%` }} />
</div>
)}
{error && <p className="error">{error}</p>}
{isStreaming && <div className="typing-indicator">...</div>}
</div>
<form onSubmit={handleSubmit} className="input-form">
<input
value={input}
onChange={e => setInput(e.target.value)}
placeholder="Ask anything..."
disabled={isStreaming}
/>
{isStreaming
? <button type="button" onClick={stop}>Stop</button>
: <button type="submit">Send</button>
}
</form>
</div>
);
}
# Start the FastAPI server
pip install fastapi uvicorn deepcrew-ai
uvicorn server:app --reload --port 8000
# Test with curl
curl -X POST http://localhost:8000/v1/query \
-H "Content-Type: application/json" \
-d '{"query": "What is the James Webb Space Telescope discovering?", "stream": true}'