Examples Library

Real-world examples
across every domain.

Complete, runnable examples for legal, financial, medical, DevOps, e-commerce, and more — using OpenAI, Anthropic, Gemini, Bedrock, Azure, and local Ollama models.

LLM Providers

Provider Quick Reference

deepcrew-ai uses LiteLLM for all LLM calls — pass any provider's model string to Agent(model=...).

providers.py
import os

# ── OpenAI ──────────────────────────────────────────────────────────────────
os.environ["OPENAI_API_KEY"] = "sk-..."
Agent("a", model="openai/gpt-4o")
Agent("a", model="openai/gpt-4o-mini")
Agent("a", model="openai/o3")                     # reasoning model

# ── Anthropic ────────────────────────────────────────────────────────────────
os.environ["ANTHROPIC_API_KEY"] = "sk-ant-..."
Agent("a", model="anthropic/claude-opus-4-8")
Agent("a", model="anthropic/claude-sonnet-4-6")
Agent("a", model="anthropic/claude-haiku-4-5-20251001")

# ── Google Gemini ─────────────────────────────────────────────────────────────
os.environ["GEMINI_API_KEY"] = "AIza..."
Agent("a", model="gemini/gemini-2.5-pro")
Agent("a", model="gemini/gemini-2.0-flash")
Agent("a", model="gemini/gemini-2.0-flash-lite")

# ── AWS Bedrock ───────────────────────────────────────────────────────────────
os.environ["AWS_ACCESS_KEY_ID"] = "..."
os.environ["AWS_SECRET_ACCESS_KEY"] = "..."
os.environ["AWS_DEFAULT_REGION"] = "us-east-1"
Agent("a", model="bedrock/anthropic.claude-opus-4-8-20250514-v1:0")
Agent("a", model="bedrock/amazon.nova-pro-v1:0")
Agent("a", model="bedrock/meta.llama3-70b-instruct-v1:0")

# ── Azure OpenAI ──────────────────────────────────────────────────────────────
os.environ["AZURE_API_KEY"] = "..."
os.environ["AZURE_API_BASE"] = "https://YOUR-RESOURCE.openai.azure.com"
os.environ["AZURE_API_VERSION"] = "2024-10-21"
Agent("a", model="azure/gpt-4o")
Agent("a", model="azure/gpt-4o-mini")

# ── Groq (fast inference) ─────────────────────────────────────────────────────
os.environ["GROQ_API_KEY"] = "gsk_..."
Agent("a", model="groq/llama-3.3-70b-versatile")
Agent("a", model="groq/mixtral-8x7b-32768")

# ── Cohere ────────────────────────────────────────────────────────────────────
os.environ["COHERE_API_KEY"] = "..."
Agent("a", model="cohere/command-r-plus")

# ── Mistral ───────────────────────────────────────────────────────────────────
os.environ["MISTRAL_API_KEY"] = "..."
Agent("a", model="mistral/mistral-large-latest")
Agent("a", model="mistral/codestral-latest")

# ── Local via Ollama ──────────────────────────────────────────────────────────
# (no API key needed — run: ollama pull llama3.2)
Agent("a", model="ollama/llama3.2")
Agent("a", model="ollama/qwen2.5-coder:7b")
Agent("a", model="ollama/phi3.5")
Agent("a", model="ollama/mistral")

Provider comparison

ProviderBest forEnv var
OpenAI GPT-4oGeneral tasks, code, reasoningOPENAI_API_KEY
Anthropic ClaudeLong context, nuanced writing, safetyANTHROPIC_API_KEY
Google GeminiMultimodal, long context, speedGEMINI_API_KEY
AWS BedrockRegulated industries, no data egressAWS_*
Azure OpenAIEnterprise compliance, SLA guaranteesAZURE_API_*
GroqUltra-low latency inferenceGROQ_API_KEY
OllamaFully local, air-gapped, no costNone
Domain · OpenAI + Anthropic

Multi-Model Research Pipeline

A three-agent research pipeline where each agent uses a different provider, with APEX synthesis and source citation.

research_pipeline.py
"""
Multi-model research pipeline.
Providers: OpenAI (researcher) + Anthropic (analyst) + Gemini (writer) → OpenAI APEX
"""
import asyncio
from deepcrew import Agent, Orchestrator, ApexConfig, tool
from deepcrew import WebSearchSkill, LoopConfig, RetryPolicy

@tool
def search_arxiv(query: str, max_results: int = 5) -> list[dict]:
    """Search arXiv for academic papers.

    Args:
        query (str): Search query.
        max_results (int): Maximum results.
    """
    import httpx
    resp = httpx.get(
        "https://export.arxiv.org/api/query",
        params={"search_query": f"all:{query}", "max_results": max_results},
    )
    # Parse Atom XML (simplified)
    import re
    titles = re.findall(r"(.*?)", resp.text)[1:]  # skip feed title
    summaries = re.findall(r"(.*?)", resp.text, re.DOTALL)
    return [{"title": t.strip(), "summary": s.strip()[:200]} for t, s in zip(titles, summaries)]

@tool
def fetch_wikipedia(topic: str) -> str:
    """Fetch Wikipedia article summary for a topic.

    Args:
        topic (str): Topic to look up.
    """
    import httpx
    resp = httpx.get(
        "https://en.wikipedia.org/api/rest_v1/page/summary/" + topic.replace(" ", "_")
    )
    data = resp.json()
    return data.get("extract", "Not found.")[:1000]

researcher = Agent(
    name="researcher",
    model="openai/gpt-4o-mini",
    system_prompt="You are an expert researcher. Use all tools to gather comprehensive, accurate information. Prioritize recent sources.",
    tools=[search_arxiv, fetch_wikipedia],
    skills=[WebSearchSkill()],
    retry_policy=RetryPolicy(max_retries=2),
    loop_config=LoopConfig(
        max_iterations=3,
        convergence_fn=lambda r: len(r.text) > 600,
        refine_prompt="Your research is thin. Search for more specific papers and sources.",
    ),
)

analyst = Agent(
    name="analyst",
    model="anthropic/claude-haiku-4-5-20251001",
    system_prompt="You are a critical analyst. Evaluate the research, identify patterns, assess reliability, and highlight key insights and limitations.",
    retry_policy=RetryPolicy(max_retries=2),
)

writer = Agent(
    name="writer",
    model="gemini/gemini-2.0-flash",
    system_prompt="You are a science communicator. Write clear, engaging summaries that are accurate and accessible to a general audience.",
)

orch = Orchestrator(
    agents=[researcher, analyst, writer],
    router_model="openai/gpt-4o-mini",
    apex_model="openai/gpt-4o",
    apex_config=ApexConfig(cite_sources=True, confidence_threshold=0.75),
    max_parallel_agents=3,
)

async def main():
    result = await orch.run("What are the latest breakthroughs in room-temperature superconductivity?")
    print(result.final_text)
    print(f"\nConfidence: {result.agent_results[-1].confidence:.2%}")
    print(f"Total tokens: {result.total_tokens:,}")

asyncio.run(main())
Domain · Anthropic + OpenAI + Groq

Automated Code Review

Multi-agent code review pipeline: security scanning, performance analysis, and style checking — each with a specialized model.

code_review.py
"""
Automated code review using specialized models per concern.
Claude for security (strong reasoning), Groq for speed on style checks.
"""
import asyncio
from deepcrew import Agent, Orchestrator, ApexConfig, RetryPolicy, FallbackChain, tool
from deepcrew import CodeExecutionSkill

@tool
def run_linter(code: str, language: str = "python") -> dict:
    """Run a static linter on code.

    Args:
        code (str): Source code to lint.
        language (str): Programming language.
    """
    import subprocess, tempfile, os
    if language != "python":
        return {"issues": [], "language": language}
    with tempfile.NamedTemporaryFile(suffix=".py", mode="w", delete=False) as f:
        f.write(code)
        fname = f.name
    try:
        result = subprocess.run(
            ["python", "-m", "py_compile", fname],
            capture_output=True, text=True, timeout=10
        )
        return {"syntax_ok": result.returncode == 0, "errors": result.stderr}
    finally:
        os.unlink(fname)

@tool
def check_dependencies(requirements_txt: str) -> list[dict]:
    """Check dependencies for known vulnerabilities.

    Args:
        requirements_txt (str): Content of requirements.txt
    """
    # In production: use pip-audit or safety API
    lines = [l.strip() for l in requirements_txt.splitlines() if l.strip()]
    return [{"package": line, "status": "ok", "vulnerabilities": []} for line in lines]

security_auditor = Agent(
    name="security_auditor",
    model="anthropic/claude-opus-4-8",   # Claude has strong security reasoning
    system_prompt="""You are a senior security engineer. Audit code for:
    - Injection vulnerabilities (SQL, command, LDAP)
    - Authentication/authorization flaws
    - Insecure data handling (secrets, PII)
    - Dependency vulnerabilities
    - Cryptographic weaknesses
    Be specific: cite line numbers and provide remediation code.""",
    tools=[check_dependencies, run_linter],
    retry_policy=RetryPolicy(max_retries=2),
    fallback_chain=FallbackChain(["anthropic/claude-haiku-4-5-20251001"]),
    temperature=0.1,
)

performance_analyst = Agent(
    name="performance_analyst",
    model="openai/gpt-4o",
    system_prompt="""You are a performance engineering expert. Identify:
    - O(n²) or worse algorithmic complexity
    - Unnecessary I/O in loops
    - Missing caching opportunities
    - Memory leaks or excessive allocations
    - Blocking calls in async contexts
    Provide Big-O analysis and optimization suggestions.""",
    skills=[CodeExecutionSkill(timeout=10.0)],  # can run benchmarks
    temperature=0.2,
)

style_checker = Agent(
    name="style_checker",
    model="groq/llama-3.3-70b-versatile",   # Groq is very fast for style checks
    system_prompt="""You are a code quality reviewer. Check for:
    - Naming conventions and clarity
    - Function/class size and single responsibility
    - Documentation completeness
    - Test coverage gaps
    - Code duplication
    Be constructive and provide specific improvement suggestions.""",
    temperature=0.3,
)

orch = Orchestrator(
    agents=[security_auditor, performance_analyst, style_checker],
    router_model="openai/gpt-4o-mini",
    apex_model="openai/gpt-4o",
    apex_config=ApexConfig(
        cite_sources=True,
        confidence_threshold=0.8,
    ),
)

async def review_code(code: str) -> str:
    result = await orch.run(f"Review this code:\n\n```python\n{code}\n```")
    return result.final_text

CODE_TO_REVIEW = """
import sqlite3
import os

def get_user(username):
    conn = sqlite3.connect('users.db')
    cursor = conn.cursor()
    query = f"SELECT * FROM users WHERE username = '{username}'"
    cursor.execute(query)
    return cursor.fetchall()

def process_data(items):
    results = []
    for item in items:
        for other in items:  # O(n^2)
            if item['id'] != other['id']:
                results.append(item['value'] * other['value'])
    return results
"""

asyncio.run(review_code(CODE_TO_REVIEW))
Domain · AWS Bedrock

Financial Analysis Pipeline

Financial data analysis running entirely on AWS Bedrock — no data leaves your VPC. Covers earnings analysis, risk assessment, and investment thesis generation.

financial_analysis.py
"""
Financial analysis on AWS Bedrock — data stays in your VPC.
Requires: AWS credentials with Bedrock model access.
"""
import asyncio
import os
from deepcrew import Agent, WorkflowBuilder, tool, RetryPolicy, InMemoryProvider

os.environ["AWS_DEFAULT_REGION"] = "us-east-1"

@tool
def fetch_financials(ticker: str, period: str = "annual") -> dict:
    """Fetch financial statements for a stock ticker.

    Args:
        ticker (str): Stock ticker symbol (e.g., 'AAPL', 'MSFT').
        period (str): 'annual' or 'quarterly'.
    """
    # In production: use yfinance, Alpha Vantage, or SEC EDGAR API
    return {
        "ticker": ticker,
        "revenue": [100_000_000, 115_000_000, 132_000_000],
        "net_income": [15_000_000, 18_000_000, 22_000_000],
        "eps": [2.50, 3.10, 3.75],
        "debt_to_equity": 0.45,
        "free_cash_flow": 28_000_000,
    }

@tool
def get_market_data(ticker: str) -> dict:
    """Get current market data and valuation metrics.

    Args:
        ticker (str): Stock ticker symbol.
    """
    return {
        "price": 185.50,
        "pe_ratio": 28.5,
        "ps_ratio": 7.2,
        "52w_high": 201.00,
        "52w_low": 142.00,
        "market_cap": 2_900_000_000_000,
    }

@tool
def fetch_news_sentiment(ticker: str, days: int = 30) -> dict:
    """Fetch recent news sentiment for a stock.

    Args:
        ticker (str): Stock ticker symbol.
        days (int): Number of days of history.
    """
    return {
        "overall_sentiment": "positive",
        "sentiment_score": 0.72,
        "positive_articles": 45,
        "negative_articles": 12,
        "key_topics": ["AI products", "strong earnings", "market expansion"],
    }

memory = InMemoryProvider()

fundamental_analyst = Agent(
    name="fundamental_analyst",
    model="bedrock/anthropic.claude-opus-4-8-20250514-v1:0",
    system_prompt="""You are a CFA-level fundamental analyst.
    Analyze revenue growth, margins, cash generation, and balance sheet health.
    Compute key ratios and compare to industry averages.
    Identify earnings quality issues or accounting concerns.""",
    tools=[fetch_financials, get_market_data],
    memory=memory,
    retry_policy=RetryPolicy(max_retries=3, backoff_seconds=2.0),
)

sentiment_analyst = Agent(
    name="sentiment_analyst",
    model="bedrock/amazon.nova-pro-v1:0",
    system_prompt="""You are a market sentiment specialist.
    Analyze news flow, social sentiment, and management guidance.
    Identify catalysts (positive and negative) and their probability-weighted impact.""",
    tools=[fetch_news_sentiment],
)

risk_manager = Agent(
    name="risk_manager",
    model="bedrock/meta.llama3-70b-instruct-v1:0",
    system_prompt="""You are a portfolio risk manager.
    Given fundamental and sentiment analysis, assess:
    - Valuation risk (over/undervalued by how much?)
    - Business model risks (competition, regulation, technology)
    - Macro risks (rates, FX, recession sensitivity)
    - Position sizing recommendation (% of portfolio)""",
)

investment_thesis_writer = Agent(
    name="thesis_writer",
    model="bedrock/anthropic.claude-opus-4-8-20250514-v1:0",
    system_prompt="""You are an investment strategist writing for a hedge fund IC memo.
    Write a structured investment thesis with: Executive Summary, Bull/Bear cases,
    Price Target (1-year), Catalysts, Key Risks, and a clear Buy/Hold/Sell recommendation.""",
)

workflow = (
    WorkflowBuilder()
    .add_agent("fundamentals", fundamental_analyst, task="Analyze the fundamentals of {input}")
    .add_agent("sentiment",    sentiment_analyst,    task="Analyze market sentiment for {input}")
    .add_agent("risk",         risk_manager,
               task="Assess risks for {input}.\n\nFundamentals:\n{fundamentals}\n\nSentiment:\n{sentiment}")
    .add_agent("thesis",       investment_thesis_writer,
               task="Write an investment thesis for {input}.\n\nFundamentals:\n{fundamentals}\n\nSentiment:\n{sentiment}\n\nRisk assessment:\n{risk}")
    .then("fundamentals", "risk")
    .then("sentiment",    "risk")
    .then("fundamentals", "thesis")
    .then("sentiment",    "thesis")
    .then("risk",         "thesis")
)

async def main():
    result = await workflow.run("NVDA")   # Analyze Nvidia
    print(result.final_output.text)

asyncio.run(main())
Domain · Gemini Flash (low latency)

Customer Support Automation

High-volume customer support triage and response using Gemini Flash for low-latency classification, with Anthropic Claude handling complex escalations.

customer_support.py
"""
Customer support pipeline:
- Gemini Flash for fast triage/classification (low latency, high volume)
- Claude for complex, empathetic escalation responses
- Memory to track customer history
"""
import asyncio
from deepcrew import (
    Agent, Orchestrator, ApexConfig, tool,
    FileMemoryProvider, RetryPolicy, FallbackChain, LoopConfig,
)

@tool
def lookup_order(order_id: str) -> dict:
    """Look up order status and details.

    Args:
        order_id (str): The order ID (format: ORD-XXXXX).
    """
    return {
        "order_id": order_id,
        "status": "shipped",
        "shipped_date": "2026-06-25",
        "estimated_delivery": "2026-07-01",
        "items": [{"name": "Widget Pro", "qty": 2, "price": 49.99}],
        "carrier": "FedEx",
        "tracking": "1234567890",
    }

@tool
def lookup_customer(email: str) -> dict:
    """Look up customer account and history.

    Args:
        email (str): Customer email address.
    """
    return {
        "email": email,
        "name": "Jane Smith",
        "tier": "premium",
        "lifetime_orders": 47,
        "account_age_days": 820,
        "open_tickets": 0,
    }

@tool
def create_refund(order_id: str, reason: str, amount: float | None = None) -> dict:
    """Create a refund for an order.

    Args:
        order_id (str): Order to refund.
        reason (str): Refund reason.
        amount (float): Amount to refund (None = full refund).
    """
    return {"refund_id": f"REF-{order_id}", "status": "approved", "amount": amount or "full"}

customer_memory = FileMemoryProvider("~/.deepcrew/support_memory.json")

triage_agent = Agent(
    name="triage",
    model="gemini/gemini-2.0-flash-lite",  # fastest/cheapest for classification
    system_prompt="""You are a customer support triage agent.
    Classify the ticket and extract key information:
    - Category: billing/shipping/technical/returns/general
    - Urgency: low/medium/high/critical
    - Customer sentiment: positive/neutral/frustrated/angry
    - Key entities: order IDs, product names, dates""",
    memory=customer_memory,
)

resolver_agent = Agent(
    name="resolver",
    model="gemini/gemini-2.0-flash",
    system_prompt="""You are a customer support specialist.
    Given the triage, look up relevant information and resolve the issue.
    Be empathetic, clear, and provide specific action items with timelines.""",
    tools=[lookup_order, lookup_customer, create_refund],
    retry_policy=RetryPolicy(max_retries=2),
    fallback_chain=FallbackChain(["anthropic/claude-haiku-4-5-20251001"]),
)

escalation_agent = Agent(
    name="escalation",
    model="anthropic/claude-haiku-4-5-20251001",  # Claude for empathetic escalations
    system_prompt="""You are a senior customer support specialist handling escalations.
    Acknowledge the customer's frustration, take ownership, and provide a clear resolution path.
    For complex cases, offer goodwill gestures appropriate to the customer's tier.""",
    tools=[lookup_order, lookup_customer, create_refund],
    loop_config=LoopConfig(
        max_iterations=2,
        convergence_fn=lambda r: "resolution" in r.text.lower() or "refund" in r.text.lower(),
        refine_prompt="Ensure your response includes a specific resolution and timeline.",
    ),
)

orch = Orchestrator(
    agents=[triage_agent, resolver_agent, escalation_agent],
    router_model="gemini/gemini-2.0-flash-lite",
    apex_model="gemini/gemini-2.0-flash",
    apex_config=ApexConfig(cite_sources=False),
)

async def handle_ticket(customer_email: str, message: str) -> str:
    context = {"customer_email": customer_email}
    result = await orch.run(
        f"Customer ({customer_email}): {message}",
        context=context,
    )
    return result.final_text

asyncio.run(handle_ticket(
    "jane@example.com",
    "My order ORD-12345 was supposed to arrive yesterday but tracking shows it's still in transit. I need this for an event tomorrow!"
))
Domain · Anthropic Claude (strong reasoning)

Medical Literature Review

Systematic literature review with PubMed integration, evidence grading, and structured clinical summary — using Claude for its strong long-context reasoning.

medical_literature.py
"""
Medical literature review using Anthropic Claude.
Claude excels at long-context reasoning and nuanced medical language.

DISCLAIMER: For research/educational use only. Not medical advice.
"""
import asyncio
from deepcrew import Agent, WorkflowBuilder, tool, LoopConfig, RetryPolicy

@tool
def search_pubmed(query: str, max_results: int = 10) -> list[dict]:
    """Search PubMed for medical literature.

    Args:
        query (str): Medical search query (supports MeSH terms).
        max_results (int): Maximum number of results.
    """
    import httpx
    # Step 1: search
    search_resp = httpx.get("https://eutils.ncbi.nlm.nih.gov/entrez/eutils/esearch.fcgi", params={
        "db": "pubmed", "term": query, "retmax": max_results, "retmode": "json"
    }).json()
    ids = search_resp.get("esearchresult", {}).get("idlist", [])
    if not ids:
        return []
    # Step 2: fetch summaries
    summary_resp = httpx.get("https://eutils.ncbi.nlm.nih.gov/entrez/eutils/esummary.fcgi", params={
        "db": "pubmed", "id": ",".join(ids), "retmode": "json"
    }).json()
    results = []
    for uid in ids:
        art = summary_resp.get("result", {}).get(uid, {})
        results.append({
            "pmid": uid,
            "title": art.get("title", ""),
            "journal": art.get("source", ""),
            "year": art.get("pubdate", "")[:4],
            "authors": art.get("authors", [{}])[0].get("name", "") + " et al." if art.get("authors") else "",
        })
    return results

@tool
def fetch_abstract(pmid: str) -> str:
    """Fetch the abstract for a PubMed article.

    Args:
        pmid (str): PubMed ID.
    """
    import httpx
    resp = httpx.get("https://eutils.ncbi.nlm.nih.gov/entrez/eutils/efetch.fcgi", params={
        "db": "pubmed", "id": pmid, "rettype": "abstract", "retmode": "text"
    })
    return resp.text[:2000]

literature_searcher = Agent(
    name="literature_searcher",
    model="anthropic/claude-haiku-4-5-20251001",  # fast + good for search queries
    system_prompt="""You are a medical librarian. Search PubMed systematically using
    multiple relevant MeSH terms and synonyms. Fetch abstracts for the most relevant papers.
    Focus on: RCTs, systematic reviews, and meta-analyses published in last 5 years.""",
    tools=[search_pubmed, fetch_abstract],
    loop_config=LoopConfig(
        max_iterations=3,
        convergence_fn=lambda r: r.text.count("PMID") >= 5,
        refine_prompt="You need more papers. Try different MeSH terms and synonyms. Aim for at least 8 relevant papers.",
    ),
    retry_policy=RetryPolicy(max_retries=2),
)

evidence_grader = Agent(
    name="evidence_grader",
    model="anthropic/claude-sonnet-4-6",
    system_prompt="""You are a clinical epidemiologist applying GRADE methodology.
    For each paper, assess:
    - Study design (RCT > cohort > case-control > expert opinion)
    - Risk of bias (randomization, blinding, attrition)
    - Consistency across studies
    - Directness to the clinical question
    - Precision (confidence intervals, sample size)
    Grade overall evidence: High / Moderate / Low / Very Low.""",
)

clinical_summarizer = Agent(
    name="clinical_summarizer",
    model="anthropic/claude-opus-4-8",  # strongest for nuanced medical synthesis
    system_prompt="""You are a clinical researcher writing for a peer-reviewed journal.
    Synthesize evidence into a structured review with:
    - Background and clinical question
    - Summary of evidence (organized by intervention/outcome)
    - Evidence quality (GRADE ratings)
    - Clinical implications and practice recommendations
    - Limitations and knowledge gaps
    - References (PMID citations)
    Be precise with statistics (NNT, OR, HR, 95% CI).""",
    max_tokens=4096,
)

workflow = (
    WorkflowBuilder()
    .add_agent("search",    literature_searcher, task="{input}")
    .add_agent("grade",     evidence_grader,     task="Grade evidence from:\n{search}")
    .add_agent("synthesis", clinical_summarizer,
               task="Write a clinical evidence summary for: {input}\n\nLiterature:\n{search}\n\nEvidence grades:\n{grade}")
    .then("search", "grade")
    .then("search", "synthesis")
    .then("grade",  "synthesis")
)

async def main():
    result = await workflow.run(
        "What is the efficacy of GLP-1 receptor agonists for non-alcoholic fatty liver disease?"
    )
    print(result.final_output.text)

asyncio.run(main())
Domain · Multi-provider with fallback

E-Commerce Intelligence Platform

Product catalog enrichment, pricing optimization, and review analysis — with multi-provider fallback for high availability.

ecommerce_platform.py
"""
E-commerce intelligence: product enrichment + pricing + sentiment analysis.
Uses multi-provider fallback for 99.9% uptime.
"""
import asyncio
from deepcrew import (
    Agent, Orchestrator, ApexConfig, tool,
    RetryPolicy, FallbackChain, InMemoryProvider, WebSearchSkill,
)

@tool
def get_product_data(product_id: str) -> dict:
    """Fetch product data from catalog.

    Args:
        product_id (str): Product SKU or ID.
    """
    return {
        "id": product_id,
        "name": "Premium Wireless Headphones",
        "category": "Electronics > Audio",
        "price": 149.99,
        "cost": 52.00,
        "inventory": 234,
        "rating": 4.3,
        "review_count": 1847,
    }

@tool
def get_competitor_prices(product_name: str) -> list[dict]:
    """Scrape competitor prices for a product.

    Args:
        product_name (str): Product name to search.
    """
    return [
        {"retailer": "Amazon", "price": 139.99, "in_stock": True},
        {"retailer": "Best Buy", "price": 159.99, "in_stock": True},
        {"retailer": "Walmart", "price": 134.99, "in_stock": False},
    ]

@tool
def fetch_reviews(product_id: str, count: int = 50) -> list[dict]:
    """Fetch customer reviews for a product.

    Args:
        product_id (str): Product ID.
        count (int): Number of reviews to fetch.
    """
    return [
        {"rating": 5, "text": "Amazing sound quality, very comfortable.", "helpful": 45},
        {"rating": 2, "text": "Battery life is disappointing, disconnects often.", "helpful": 32},
        {"rating": 4, "text": "Great for the price, noise cancellation works well.", "helpful": 28},
    ]

# Fallback chain ensures high availability even if primary model is down
fallback = FallbackChain(["anthropic/claude-haiku-4-5-20251001", "gemini/gemini-2.0-flash"])

content_enricher = Agent(
    name="content_enricher",
    model="openai/gpt-4o",
    system_prompt="""You are an e-commerce content specialist.
    Enrich product listings with:
    - SEO-optimized title and description
    - Key features bullet points (5-7)
    - Target customer persona
    - Use case scenarios
    - Cross-sell/upsell recommendations""",
    tools=[get_product_data],
    skills=[WebSearchSkill()],
    retry_policy=RetryPolicy(max_retries=3),
    fallback_chain=fallback,
)

pricing_optimizer = Agent(
    name="pricing_optimizer",
    model="openai/gpt-4o-mini",
    system_prompt="""You are a pricing strategist.
    Analyze competitor prices and our margins to recommend optimal pricing:
    - Price point relative to competitors
    - Expected demand elasticity impact
    - Margin impact calculation
    - Bundle pricing opportunities
    - Dynamic pricing triggers (inventory level, seasonality)""",
    tools=[get_product_data, get_competitor_prices],
    retry_policy=RetryPolicy(max_retries=3),
    fallback_chain=fallback,
)

review_analyst = Agent(
    name="review_analyst",
    model="gemini/gemini-2.0-flash",
    system_prompt="""You are a customer insights analyst.
    Analyze reviews to extract:
    - Sentiment distribution and trend
    - Top praise themes (what customers love)
    - Top complaint themes (what to improve)
    - Feature requests
    - Competitor comparisons mentioned
    - NPS-style scoring""",
    tools=[fetch_reviews],
    retry_policy=RetryPolicy(max_retries=3),
    fallback_chain=FallbackChain(["openai/gpt-4o-mini"]),
)

orch = Orchestrator(
    agents=[content_enricher, pricing_optimizer, review_analyst],
    router_model="openai/gpt-4o-mini",
    apex_model="openai/gpt-4o",
    apex_config=ApexConfig(cite_sources=True, confidence_threshold=0.75),
)

async def enrich_product(product_id: str) -> str:
    result = await orch.run(f"Provide full intelligence report for product {product_id}")
    return result.final_text

asyncio.run(enrich_product("PROD-WH-001"))
Domain · OpenTelemetry + agent spawning

DevOps Incident Response

AI-powered incident response: log analysis, root cause identification, and automated remediation — with full OpenTelemetry tracing and agent spawning for parallel diagnosis.

incident_response.py
"""
DevOps incident response with agent spawning and OTel tracing.
Parent agent spawns specialized sub-agents for parallel root cause analysis.
"""
import asyncio
from deepcrew import (
    Agent, Orchestrator, tool, ObservabilityConfig,
    RetryPolicy, FallbackChain, spawn_agent, SpawnRequest,
)

obs = ObservabilityConfig(
    otel_endpoint="http://otel-collector.internal:4317",
    service_name="incident-response-ai",
)

@tool
def get_error_logs(service: str, minutes: int = 30, log_level: str = "ERROR") -> list[str]:
    """Fetch recent error logs from a service.

    Args:
        service (str): Service name (e.g., 'api-gateway', 'payment-service').
        minutes (int): How many minutes of logs to fetch.
        log_level (str): Log level filter: DEBUG/INFO/WARN/ERROR/CRITICAL.
    """
    # In production: connect to CloudWatch, Datadog, or ELK
    return [
        f"[ERROR] {service}: Connection pool exhausted (pool_size=10, waiting=47)",
        f"[ERROR] {service}: Timeout after 30s connecting to postgres-replica-2",
        f"[CRITICAL] {service}: Circuit breaker OPEN for downstream payment-processor",
    ]

@tool
def get_metrics(service: str, metric: str, minutes: int = 30) -> dict:
    """Fetch service metrics from monitoring system.

    Args:
        service (str): Service name.
        metric (str): Metric name (e.g., 'error_rate', 'p99_latency', 'cpu', 'memory').
        minutes (int): Time window in minutes.
    """
    return {
        "service": service,
        "metric": metric,
        "current": 94.2 if metric == "error_rate" else 3450,
        "baseline": 0.5 if metric == "error_rate" else 120,
        "threshold": 5.0 if metric == "error_rate" else 2000,
        "status": "critical",
    }

@tool
def run_diagnostic_command(command: str) -> str:
    """Run a safe read-only diagnostic command.

    Args:
        command (str): Diagnostic command (kubectl describe, pg_stat_activity, etc.).
    """
    import subprocess
    # Whitelist only safe read-only commands
    SAFE_PREFIXES = ["kubectl get", "kubectl describe", "kubectl logs", "pg_stat", "df ", "free "]
    if not any(command.startswith(p) for p in SAFE_PREFIXES):
        return f"Command blocked for safety: {command}"
    try:
        result = subprocess.run(command.split(), capture_output=True, text=True, timeout=10)
        return result.stdout[:2000] or result.stderr[:500]
    except Exception as e:
        return str(e)

@tool
def create_incident_ticket(title: str, severity: str, description: str, assigned_team: str) -> dict:
    """Create an incident ticket in the ticketing system.

    Args:
        title (str): Incident title.
        severity (str): P1/P2/P3/P4.
        description (str): Detailed description.
        assigned_team (str): Team to assign (sre, backend, database, network).
    """
    return {"ticket_id": "INC-2847", "url": "https://incidents.example.com/INC-2847", "created": True}

# Main incident commander with spawning enabled
incident_commander = Agent(
    name="incident_commander",
    model="openai/gpt-4o",
    system_prompt="""You are an SRE incident commander. Your job is to:
    1. Quickly assess the scope and severity of the incident
    2. Spawn specialized diagnostic agents for parallel root cause analysis
    3. Coordinate findings into a clear timeline and root cause
    4. Recommend immediate mitigations and long-term fixes
    5. Create the incident ticket with full details
    Use the spawn_agent tool to run parallel diagnostic investigations.""",
    tools=[get_error_logs, get_metrics, create_incident_ticket],
    retry_policy=RetryPolicy(max_retries=3),
    fallback_chain=FallbackChain(["anthropic/claude-sonnet-4-6"]),
)

orch = Orchestrator(
    agents=[incident_commander],
    router_model="openai/gpt-4o-mini",
    apex_model="openai/gpt-4o",
    global_tools=[get_error_logs, get_metrics, run_diagnostic_command],
    enable_spawn=True,   # commander can spawn specialized sub-agents
)

async def respond_to_incident(alert: str) -> str:
    result = await orch.run(alert)
    return result.final_text

asyncio.run(respond_to_incident(
    "CRITICAL ALERT: payment-service error rate 94.2% (threshold: 5%). "
    "P99 latency 3450ms (baseline: 120ms). Started 15 minutes ago."
))
Domain · Looping + Skills + Memory

Content Marketing Pipeline

Draft → critique → refine content pipeline using looping for quality convergence, memory for brand consistency, and built-in skills for research and summarization.

content_pipeline.py
"""
Content marketing pipeline:
- Looping for draft → critique → refine quality control
- FileMemoryProvider for brand voice consistency across runs
- Skills: WebSearch + Summarize for research-backed content
"""
import asyncio
from deepcrew import (
    Agent, WorkflowBuilder, tool,
    WebSearchSkill, SummarizeSkill,
    LoopConfig, FileMemoryProvider,
    RetryPolicy,
)

# Persist brand guidelines and past content across runs
brand_memory = FileMemoryProvider("~/.deepcrew/brand_memory.json")

@tool
def get_seo_keywords(topic: str, count: int = 10) -> list[dict]:
    """Get SEO keyword suggestions for a topic.

    Args:
        topic (str): Content topic.
        count (int): Number of keyword suggestions.
    """
    # In production: use Google Keyword Planner, Ahrefs, or SEMrush API
    return [
        {"keyword": f"{topic} guide", "volume": 12000, "difficulty": 42},
        {"keyword": f"best {topic}", "volume": 8500, "difficulty": 55},
        {"keyword": f"how to {topic}", "volume": 6200, "difficulty": 38},
    ]

@tool
def check_readability(text: str) -> dict:
    """Check content readability metrics.

    Args:
        text (str): Text to analyze.
    """
    words = text.split()
    sentences = text.split(".")
    avg_words = len(words) / max(len(sentences), 1)
    return {
        "word_count": len(words),
        "sentence_count": len(sentences),
        "avg_words_per_sentence": round(avg_words, 1),
        "estimated_flesch_grade": min(12, max(6, avg_words * 0.3)),
        "reading_time_minutes": round(len(words) / 200, 1),
    }

researcher = Agent(
    name="researcher",
    model="openai/gpt-4o-mini",
    system_prompt="""You are a content researcher. Gather comprehensive, accurate information
    on the topic using web search. Find statistics, expert quotes, and relevant examples.
    Identify the top SEO keywords to target.""",
    tools=[get_seo_keywords],
    skills=[WebSearchSkill(), SummarizeSkill(model="openai/gpt-4o-mini")],
    memory=brand_memory,
)

writer = Agent(
    name="writer",
    model="anthropic/claude-haiku-4-5-20251001",
    system_prompt="""You are a content marketing writer. Write engaging, informative content
    that: is SEO-optimized for the target keywords, follows the brand voice (check memory),
    uses data and examples from the research, includes clear headers (H2, H3),
    ends with a compelling CTA, and is free of fluff.""",
    memory=brand_memory,
    loop_config=LoopConfig(
        max_iterations=3,
        convergence_fn=lambda r: (
            len(r.text.split()) > 600 and
            "##" in r.text and
            r.text.endswith(("action", "today", "now", "started", "free", "learn more"))
        ),
        refine_prompt="""Your draft needs improvement. Ensure:
        1. At least 700 words
        2. Multiple H2/H3 headers for structure
        3. At least 2 statistics or data points
        4. A clear CTA in the final paragraph
        Revise accordingly.""",
    ),
    retry_policy=RetryPolicy(max_retries=2),
)

editor = Agent(
    name="editor",
    model="openai/gpt-4o",
    system_prompt="""You are a senior content editor. Review drafts for:
    - Factual accuracy (flag unsupported claims)
    - SEO optimization (keyword density, meta description suggestion)
    - Brand voice consistency
    - Readability (suggest simplifications for complex sentences)
    - Engagement (opening hook, flow, punchiness)
    Provide specific line edits and an overall quality score (1-10).""",
    tools=[check_readability],
)

social_media_writer = Agent(
    name="social_writer",
    model="gemini/gemini-2.0-flash",
    system_prompt="""You are a social media expert. Create platform-specific posts from the article:
    - Twitter/X: 3 tweet thread (280 chars each)
    - LinkedIn: Professional long-form post with hashtags
    - Instagram caption: Engaging, emoji-rich, with hashtags
    Each should standalone while teasing the full article.""",
)

workflow = (
    WorkflowBuilder()
    .add_agent("research",  researcher,          task="Research topic for content: {input}")
    .add_agent("draft",     writer,              task="Write a blog post about:\n\n{input}\n\nResearch:\n{research}")
    .add_agent("edit",      editor,              task="Edit this draft:\n\n{draft}")
    .add_agent("social",    social_media_writer, task="Create social posts from:\n\n{draft}")
    .then("research", "draft")
    .then("draft", "edit")
    .then("draft", "social")
)

async def create_content(topic: str):
    result = await workflow.run(topic)
    print("=== EDITED ARTICLE ===")
    print(result.outputs["edit"].text)
    print("\n=== SOCIAL MEDIA POSTS ===")
    print(result.outputs["social"].text)

asyncio.run(create_content("10 Python async programming patterns every developer should know"))
Domain · Ollama (fully local, no API key)

Fully Local AI — No Cloud Required

Run a complete multi-agent pipeline with Ollama models on your local machine. Zero cost, complete data privacy, works air-gapped.

shell
# Install Ollama (https://ollama.com)
# Then pull the models you want to use:
ollama pull llama3.2          # general-purpose, 3B params
ollama pull qwen2.5-coder:7b  # code-optimized, 7B params
ollama pull phi3.5            # Microsoft's small but capable model
ollama pull mistral           # great for analysis
local_pipeline.py
"""
Fully local multi-agent pipeline using Ollama.
No API keys, no cloud calls, works offline.
Prerequisites: Ollama running at http://localhost:11434
"""
import asyncio
from deepcrew import Agent, Orchestrator, ApexConfig, tool, LoopConfig
from deepcrew import CodeExecutionSkill

@tool
def read_file(path: str) -> str:
    """Read a local file.

    Args:
        path (str): File path to read.
    """
    try:
        with open(path, "r", encoding="utf-8") as f:
            return f.read()[:5000]
    except FileNotFoundError:
        return f"File not found: {path}"

@tool
def list_files(directory: str, extension: str = "") -> list[str]:
    """List files in a directory.

    Args:
        directory (str): Directory path.
        extension (str): File extension filter (e.g., '.py', '.md').
    """
    import os
    try:
        files = os.listdir(directory)
        if extension:
            files = [f for f in files if f.endswith(extension)]
        return files[:50]
    except Exception as e:
        return [str(e)]

# General-purpose reasoning agent
analyst = Agent(
    name="analyst",
    model="ollama/llama3.2",
    system_prompt="You are a helpful analyst. Reason carefully and provide well-structured answers.",
    tools=[read_file, list_files],
)

# Code-specialized agent
coder = Agent(
    name="coder",
    model="ollama/qwen2.5-coder:7b",
    system_prompt="You are an expert programmer. Write clean, efficient, well-documented code.",
    tools=[read_file, list_files],
    skills=[CodeExecutionSkill(timeout=15.0)],
    loop_config=LoopConfig(
        max_iterations=3,
        convergence_fn=lambda r: "def " in r.text or "class " in r.text,
        refine_prompt="Your response doesn't include actual code. Please write the implementation.",
    ),
)

# Writing agent
writer = Agent(
    name="writer",
    model="ollama/phi3.5",
    system_prompt="You are a technical writer. Write clear, concise documentation.",
)

orch = Orchestrator(
    agents=[analyst, coder, writer],
    router_model="ollama/llama3.2",    # local router too
    apex_model="ollama/mistral",       # local synthesis
    apex_config=ApexConfig(cite_sources=False),  # simpler for local models
)

async def main():
    # Local code analysis
    result = await orch.run(
        "Analyze the current directory structure, identify Python files, "
        "and suggest how to add type hints to improve code quality."
    )
    print(result.final_text)

asyncio.run(main())

Local-only workflow.yaml

local_workflow.yaml
agents:
  - name: researcher
    model: ollama/llama3.2
    system_prompt: Research and gather information thoroughly.
    tools: [web_search]

  - name: writer
    model: ollama/phi3.5
    system_prompt: Write clear and well-structured content.

workflow:
  - step: research
    agent: researcher
    task: "{input}"

  - step: article
    agent: writer
    task: "Write about:\n{research}"
    depends_on: [research]
shell
deepcrew run local_workflow.yaml --input "Explain how neural networks learn"
Integration · FastAPI + React

Real-Time Streaming API

Complete FastAPI backend with Server-Sent Events streaming, plus a React hook for consuming the stream in the frontend.

server.py
"""
FastAPI streaming server with deepcrew-ai.
Streams events to the browser as SSE.
"""
import asyncio
from fastapi import FastAPI, HTTPException
from fastapi.responses import StreamingResponse
from fastapi.middleware.cors import CORSMiddleware
from pydantic import BaseModel
from deepcrew import Agent, Orchestrator, ApexConfig, tool, ObservabilityConfig
from deepcrew import RetryPolicy, FallbackChain, WebSearchSkill

app = FastAPI(title="deepcrew AI API", version="0.2.0")

app.add_middleware(
    CORSMiddleware,
    allow_origins=["http://localhost:3000"],
    allow_methods=["*"],
    allow_headers=["*"],
)

obs = ObservabilityConfig(
    otel_endpoint="http://localhost:4317",
    service_name="deepcrew-api",
)

@tool
def search_web(query: str) -> str:
    "Search the web for current information."
    import httpx
    r = httpx.get(
        f"https://api.duckduckgo.com/?q={query}&format=json&no_redirect=1"
    )
    data = r.json()
    return data.get("AbstractText") or ", ".join(
        r.get("Text", "") for r in data.get("RelatedTopics", [])[:3]
    ) or "No results found."

def build_orchestrator() -> Orchestrator:
    agents = [
        Agent(
            name="researcher",
            model="openai/gpt-4o-mini",
            system_prompt="Research the topic thoroughly.",
            tools=[search_web],
            skills=[WebSearchSkill()],
            retry_policy=RetryPolicy(max_retries=2),
            fallback_chain=FallbackChain(["anthropic/claude-haiku-4-5-20251001"]),
        ),
        Agent(
            name="analyst",
            model="anthropic/claude-haiku-4-5-20251001",
            system_prompt="Analyze and provide critical insights.",
            retry_policy=RetryPolicy(max_retries=2),
            fallback_chain=FallbackChain(["gemini/gemini-2.0-flash"]),
        ),
    ]
    return Orchestrator(
        agents=agents,
        router_model="openai/gpt-4o-mini",
        apex_model="openai/gpt-4o",
        apex_config=ApexConfig(cite_sources=True, confidence_threshold=0.75),
    )

orch = build_orchestrator()

class QueryRequest(BaseModel):
    query: str
    stream: bool = True

@app.post("/v1/query")
async def query_endpoint(req: QueryRequest):
    if not req.query.strip():
        raise HTTPException(status_code=400, detail="Query cannot be empty")

    if req.stream:
        async def event_generator():
            try:
                async for event in orch.stream(req.query):
                    yield event.to_sse()
            except Exception as e:
                import json
                yield f"event: error\ndata: {json.dumps({'message': str(e)})}\n\n"

        return StreamingResponse(
            event_generator(),
            media_type="text/event-stream",
            headers={
                "Cache-Control": "no-cache",
                "X-Accel-Buffering": "no",
                "Connection": "keep-alive",
            },
        )
    else:
        result = await orch.run(req.query)
        return {
            "text": result.final_text,
            "confidence": result.agent_results[-1].confidence if result.agent_results else None,
            "total_tokens": result.total_tokens,
        }

@app.get("/health")
async def health():
    return {"status": "ok", "version": "0.2.0"}
useDeepCrew.ts
// React hook for consuming deepcrew SSE streams
import { useState, useCallback, useRef } from "react";

interface DeepCrewEvent {
  event: string;
  agent_id: string;
  data: Record<string, unknown>;
}

interface UseDeepCrewOptions {
  apiUrl?: string;
}

export function useDeepCrew({ apiUrl = "http://localhost:8000" }: UseDeepCrewOptions = {}) {
  const [text, setText] = useState("");
  const [confidence, setConfidence] = useState<number | null>(null);
  const [isStreaming, setIsStreaming] = useState(false);
  const [error, setError] = useState<string | null>(null);
  const [events, setEvents] = useState<DeepCrewEvent[]>([]);
  const abortRef = useRef<AbortController | null>(null);

  const query = useCallback(async (q: string) => {
    abortRef.current?.abort();
    abortRef.current = new AbortController();
    setText(""); setConfidence(null); setError(null); setEvents([]);
    setIsStreaming(true);

    try {
      const res = await fetch(`${apiUrl}/v1/query`, {
        method: "POST",
        headers: { "Content-Type": "application/json" },
        body: JSON.stringify({ query: q, stream: true }),
        signal: abortRef.current.signal,
      });

      if (!res.ok) throw new Error(`HTTP ${res.status}`);

      const reader = res.body!.getReader();
      const decoder = new TextDecoder();
      let buffer = "";

      while (true) {
        const { done, value } = await reader.read();
        if (done) break;
        buffer += decoder.decode(value, { stream: true });

        const lines = buffer.split("\n\n");
        buffer = lines.pop() ?? "";

        for (const block of lines) {
          const eventLine = block.match(/^event: (.+)$/m)?.[1];
          const dataLine = block.match(/^data: (.+)$/m)?.[1];
          if (!dataLine) continue;

          const data = JSON.parse(dataLine) as DeepCrewEvent["data"];
          const ev: DeepCrewEvent = { event: eventLine ?? "message", agent_id: data.agent_id as string ?? "", data };

          setEvents(prev => [...prev, ev]);

          if (ev.event === "text_delta") {
            setText(prev => prev + (data.chunk as string ?? ""));
          } else if (ev.event === "apex_done") {
            setConfidence(data.confidence as number ?? null);
          } else if (ev.event === "error") {
            setError(data.message as string ?? "Unknown error");
          }
        }
      }
    } catch (e: unknown) {
      if ((e as Error).name !== "AbortError") setError((e as Error).message);
    } finally {
      setIsStreaming(false);
    }
  }, [apiUrl]);

  const stop = useCallback(() => abortRef.current?.abort(), []);

  return { text, confidence, isStreaming, error, events, query, stop };
}
ChatUI.tsx
import React, { useState } from "react";
import { useDeepCrew } from "./useDeepCrew";

export default function ChatUI() {
  const [input, setInput] = useState("");
  const { text, confidence, isStreaming, error, query, stop } = useDeepCrew();

  const handleSubmit = (e: React.FormEvent) => {
    e.preventDefault();
    if (input.trim()) query(input);
  };

  return (
    <div className="chat-container">
      <div className="response-area">
        {text && <p className="response-text">{text}</p>}
        {confidence !== null && (
          <div className="confidence-badge">
            Confidence: {(confidence * 100).toFixed(0)}%
            <div className="confidence-bar" style={{ width: `${confidence * 100}%` }} />
          </div>
        )}
        {error && <p className="error">{error}</p>}
        {isStreaming && <div className="typing-indicator">...</div>}
      </div>
      <form onSubmit={handleSubmit} className="input-form">
        <input
          value={input}
          onChange={e => setInput(e.target.value)}
          placeholder="Ask anything..."
          disabled={isStreaming}
        />
        {isStreaming
          ? <button type="button" onClick={stop}>Stop</button>
          : <button type="submit">Send</button>
        }
      </form>
    </div>
  );
}
shell
# Start the FastAPI server
pip install fastapi uvicorn deepcrew-ai
uvicorn server:app --reload --port 8000

# Test with curl
curl -X POST http://localhost:8000/v1/query \
  -H "Content-Type: application/json" \
  -d '{"query": "What is the James Webb Space Telescope discovering?", "stream": true}'