Metadata-Version: 2.4
Name: evaluatorq
Version: 1.3.2
Summary: An evaluation framework library for Python that provides a flexible way to run parallel evaluations and optionally integrate with the Orq AI platform.
Project-URL: Homepage, https://github.com/orq-ai/orqkit
Project-URL: Repository, https://github.com/orq-ai/orqkit/tree/main/packages/evaluatorq-py
Project-URL: Documentation, https://github.com/orq-ai/orqkit/tree/main/packages/evaluatorq-py
License: MIT
License-File: LICENSE
Requires-Python: >=3.10
Requires-Dist: httpx>=0.28.1
Requires-Dist: loguru>=0.6.0
Requires-Dist: pydantic>=2.0
Requires-Dist: rich>=14.2.0
Provides-Extra: all
Requires-Dist: huggingface-hub>=0.20.0; extra == 'all'
Requires-Dist: kaleido>=0.2.1; extra == 'all'
Requires-Dist: langchain<2.0.0,>=1.0.0; extra == 'all'
Requires-Dist: langgraph>=0.2.0; extra == 'all'
Requires-Dist: openai-agents>=0.1.0; extra == 'all'
Requires-Dist: openai<3.0.0,>=1.0.0; extra == 'all'
Requires-Dist: opentelemetry-api>=1.20.0; extra == 'all'
Requires-Dist: opentelemetry-exporter-otlp-proto-http>=1.20.0; extra == 'all'
Requires-Dist: opentelemetry-sdk>=1.20.0; extra == 'all'
Requires-Dist: opentelemetry-semantic-conventions>=0.41b0; extra == 'all'
Requires-Dist: orq-ai-sdk>=4.4.7; extra == 'all'
Requires-Dist: plotly>=5.18.0; extra == 'all'
Requires-Dist: python-dotenv>=1.0.0; extra == 'all'
Requires-Dist: streamlit>=1.30.0; extra == 'all'
Requires-Dist: typer>=0.12.0; extra == 'all'
Provides-Extra: export
Requires-Dist: kaleido>=0.2.1; extra == 'export'
Provides-Extra: langchain
Requires-Dist: langchain<2.0.0,>=1.0.0; extra == 'langchain'
Provides-Extra: langgraph
Requires-Dist: langgraph>=0.2.0; extra == 'langgraph'
Provides-Extra: openai-agents
Requires-Dist: openai-agents>=0.1.0; extra == 'openai-agents'
Provides-Extra: orq
Requires-Dist: orq-ai-sdk>=4.4.7; extra == 'orq'
Provides-Extra: otel
Requires-Dist: opentelemetry-api>=1.20.0; extra == 'otel'
Requires-Dist: opentelemetry-exporter-otlp-proto-http>=1.20.0; extra == 'otel'
Requires-Dist: opentelemetry-sdk>=1.20.0; extra == 'otel'
Requires-Dist: opentelemetry-semantic-conventions>=0.41b0; extra == 'otel'
Provides-Extra: redteam
Requires-Dist: huggingface-hub>=0.20.0; extra == 'redteam'
Requires-Dist: openai<3.0.0,>=1.0.0; extra == 'redteam'
Requires-Dist: python-dotenv>=1.0.0; extra == 'redteam'
Requires-Dist: typer>=0.12.0; extra == 'redteam'
Provides-Extra: simulation
Requires-Dist: openai<3.0.0,>=1.0.0; extra == 'simulation'
Requires-Dist: orq-ai-sdk>=4.4.7; extra == 'simulation'
Provides-Extra: ui
Requires-Dist: plotly>=5.18.0; extra == 'ui'
Requires-Dist: streamlit>=1.30.0; extra == 'ui'
Description-Content-Type: text/markdown

# evaluatorq-py

An evaluation framework library for Python that provides a flexible way to run parallel evaluations and optionally integrate with the Orq AI platform.

## Why evaluatorq?

Orq's built-in experiment runner works well for evaluating deployments hosted on the platform, but it has limits: you can only target Orq-managed agents and deployments, and evaluation logic is constrained to what the UI exposes.

`evaluatorq` was built to remove those limits. It gives you a full Python evaluation loop you control entirely — bring your own agent, your own data, your own scorers. The Orq platform becomes optional infrastructure for storing results and datasets, not a hard requirement.

**When to use evaluatorq instead of the built-in experiment runner:**

- Your agent runs outside Orq (LangChain, LangGraph, OpenAI Agents SDK, a custom HTTP service, anything)
- You need custom evaluation logic — LLM-as-judge, multi-criteria rubrics, programmatic checks, or external APIs
- You want CI/CD integration with pass/fail signals and exit codes
- You need to compare multiple agent implementations side by side in the same run
- You want full observability via OpenTelemetry into exactly what ran and how long it took

The library is deliberately lightweight: async-first, typed end-to-end, and usable standalone or wired into Orq for result storage and dataset management.

## 🎯 Features

- **Parallel Execution**: Run multiple evaluation jobs concurrently with progress tracking
- **Flexible Data Sources**: Support for inline data, async iterables, and Orq platform datasets
- **Type-safe**: Fully typed with Python type hints and Pydantic models with runtime validation
- **Rich Terminal UI**: Beautiful progress indicators and result tables powered by Rich
- **Orq Platform Integration**: Seamlessly fetch and evaluate datasets from Orq AI (optional)
- **OpenTelemetry Tracing**: Built-in observability with automatic span creation for jobs and evaluators
- **Pass/Fail Tracking**: Evaluators can return pass/fail status for CI/CD integration
- **Built-in Evaluators**: Common evaluators like `string_contains_evaluator` included
- **Integrations**: LangChain, LangGraph, OpenAI Agents SDK, and custom callable support
- **[Red Teaming](src/evaluatorq/redteam/README.md)**: Adaptive OWASP-mapped adversarial security testing for AI agents

## 📖 Table of Contents

- [Installation](#-installation)
- [Getting Started](#-getting-started)
- [Quick Start](#-quick-start)
- [Integrations](#-langchain-integration)
- [Red Teaming External Frameworks](#-red-teaming-external-agent-frameworks)
- [Configuration](#-configuration)
- [Orq Platform Integration](#-orq-platform-integration)
- [OpenTelemetry Tracing](#-opentelemetry-tracing)
- [Pass/Fail Tracking](#-passfail-tracking)
- [API Reference](#-api-reference)

## 📥 Installation

```bash
pip install evaluatorq
# or
uv add evaluatorq
# or
poetry add evaluatorq
```

### Optional Dependencies

If you want to use the Orq platform integration:

```bash
pip install orq-ai-sdk
# or
pip install evaluatorq[orq]
```

For OpenTelemetry tracing (optional):

```bash
pip install opentelemetry-api opentelemetry-sdk opentelemetry-exporter-otlp-proto-http opentelemetry-semantic-conventions
# or
pip install evaluatorq[otel]
```

For LangChain/LangGraph integration:

```bash
pip install langchain
# or
pip install evaluatorq[langchain]
```

## 🏁 Getting Started

New to evaluatorq? Follow this path to get up and running:

| Step | What you'll learn | Example |
|------|------------------|---------|
| 1. **Basic eval** | Run your first evaluation with inline data | [`pass_fail_simple.py`](examples/lib/basics/pass_fail_simple.py) |
| 2. **Multiple jobs** | Run multiple jobs in parallel on each data point | [`example_runners.py`](examples/lib/basics/example_runners.py) |
| 3. **Reusable patterns** | Create reusable jobs and evaluators | [`eval_reuse.py`](examples/lib/basics/eval_reuse.py) |
| 4. **Datasets** | Load data from the Orq platform | [`dataset_example.py`](examples/lib/datasets/dataset_example.py) |
| 5. **Structured scores** | Return multi-dimensional metrics | [`structured_rubric_eval.py`](examples/lib/structured/structured_rubric_eval.py) |
| 6. **LangChain agent** | Evaluate a LangChain/LangGraph agent | [`langchain_integration_example.py`](examples/lib/integrations/langchain/langchain_integration_example.py) |

> **Tip:** Start with step 1 and work your way up. Each example builds on concepts from the previous one.

## 🚀 Quick Start

### Basic Usage

```python
import asyncio
from evaluatorq import evaluatorq, job, DataPoint, EvaluationResult

@job("text-analyzer")
async def text_analyzer(data: DataPoint, row: int):
    """Analyze text data and return analysis results."""
    text = data.inputs["text"]
    analysis = {
        "length": len(text),
        "word_count": len(text.split()),
        "uppercase": text.upper(),
    }

    return analysis

async def length_check_scorer(params):
    """Evaluate if output length is sufficient."""
    output = params["output"]
    passes_check = output["length"] > 10

    return EvaluationResult(
        value=1 if passes_check else 0,
        explanation=(
            "Output length is sufficient"
            if passes_check
            else f"Output too short ({output['length']} chars, need >10)"
        )
    )

async def main():
    await evaluatorq(
        "text-analysis",
        data=[
            DataPoint(inputs={"text": "Hello world"}),
            DataPoint(inputs={"text": "Testing evaluation"}),
        ],
        jobs=[text_analyzer],
        evaluators=[
            {
                "name": "length-check",
                "scorer": length_check_scorer,
            }
        ],
    )

if __name__ == "__main__":
    asyncio.run(main())
```

> **Tip:** The `@job()` decorator preserves the job name in error messages. Always prefer `@job("name")` over raw functions for better debugging.

### Using Orq Platform Datasets

```python
import asyncio
from evaluatorq import evaluatorq, job, DataPoint, DatasetIdInput

@job("processor")
async def processor(data: DataPoint, row: int):
    """Process each data point from the dataset."""
    result = await process_data(data)
    return result

async def accuracy_scorer(params):
    """Calculate accuracy by comparing output with expected results."""
    data = params["data"]
    output = params["output"]

    score = calculate_score(output, data.expected_output)

    if score > 0.8:
        explanation = "High accuracy match"
    elif score > 0.5:
        explanation = "Partial match"
    else:
        explanation = "Low accuracy match"

    return {"value": score, "explanation": explanation}

async def main():
    # Requires ORQ_API_KEY environment variable
    await evaluatorq(
        "dataset-evaluation",
        data=DatasetIdInput(dataset_id="your-dataset-id"),  # From Orq platform
        jobs=[processor],
        evaluators=[
            {
                "name": "accuracy",
                "scorer": accuracy_scorer,
            }
        ],
    )

if __name__ == "__main__":
    asyncio.run(main())
```

> **Tip:** Use `parallelism` to control how many data points are processed concurrently. Start with a low value (3-5) when calling external APIs to avoid rate limits.

### Advanced Features

#### Multiple Jobs

Run multiple jobs in parallel for each data point:

```python
from evaluatorq import job

@job("preprocessor")
async def preprocessor(data: DataPoint, row: int):
    result = await preprocess(data)
    return result

@job("analyzer")
async def analyzer(data: DataPoint, row: int):
    result = await analyze(data)
    return result

@job("transformer")
async def transformer(data: DataPoint, row: int):
    result = await transform(data)
    return result

await evaluatorq(
    "multi-job-eval",
    data=[...],
    jobs=[preprocessor, analyzer, transformer],
    evaluators=[...],
)
```

#### The `@job()` Decorator

The `@job()` decorator provides two key benefits:

1. **Eliminates boilerplate** - No need to manually wrap returns with `{"name": ..., "output": ...}`
2. **Preserves job names in errors** - When a job fails, the error will include the job name for better debugging

**Decorator pattern (recommended):**
```python
from evaluatorq import job

@job("text-processor")
async def process_text(data: DataPoint, row: int):
    # Clean return - just the data!
    return {"result": data.inputs["text"].upper()}
```

**Functional pattern (for lambdas):**
```python
from evaluatorq import job

# Simple transformations with lambda
uppercase_job = job("uppercase", lambda data, row: data.inputs["text"].upper())
word_count_job = job("word-count", lambda data, row: len(data.inputs["text"].split()))
```

#### Deployment Helper

Easily invoke Orq deployments within your evaluation jobs:

```python
from evaluatorq import evaluatorq, job, invoke, deployment, DatasetIdInput

# Simple one-liner with invoke()
@job("summarizer")
async def summarize_job(data, row):
    text = data.inputs["text"]
    return await invoke("my-deployment", inputs={"text": text})

# Full response with deployment()
@job("analyzer")
async def analyze_job(data, row):
    response = await deployment(
        "my-deployment",
        inputs={"text": data.inputs["text"]},
        metadata={"source": "evaluatorq"},
    )
    print("Raw:", response.raw)
    return response.content

# Chat-style with messages
@job("chatbot")
async def chat_job(data, row):
    return await invoke(
        "chatbot",
        messages=[{"role": "user", "content": data.inputs["question"]}],
    )

# Thread tracking for conversations
@job("assistant")
async def conversation_job(data, row):
    return await invoke(
        "assistant",
        inputs={"query": data.inputs["query"]},
        thread={"id": "conversation-123"},
    )
```

The `invoke()` function returns the text content directly, while `deployment()` returns an object with both `content` and `raw` response for more control.

#### Built-in Evaluators

Use the included evaluators for common use cases:

```python
from evaluatorq import evaluatorq, job, string_contains_evaluator, DatasetIdInput

@job("country-lookup")
async def country_lookup_job(data, row):
    country = data.inputs["country"]
    return await invoke("country-capitals", inputs={"country": country})

await evaluatorq(
    "country-unit-test",
    data=DatasetIdInput(dataset_id="your-dataset-id"),
    jobs=[country_lookup_job],
    evaluators=[string_contains_evaluator()],  # Checks if output contains expected_output
    parallelism=6,
)
```

Available built-in evaluators:

- **`string_contains_evaluator()`** - Checks if output contains expected_output (case-insensitive by default)
- **`exact_match_evaluator()`** - Checks if output exactly matches expected_output

```python
# Case-sensitive matching
strict_evaluator = string_contains_evaluator(case_insensitive=False)

# Custom name
my_evaluator = string_contains_evaluator(name="my-contains-check")
```

#### Automatic Error Handling

The `@job()` decorator automatically preserves job names even when errors occur:

```python
from evaluatorq import job

@job("risky-job")
async def risky_operation(data: DataPoint, row: int):
    # If this raises an error, the job name "risky-job" will be preserved
    result = await potentially_failing_operation(data)
    return result

await evaluatorq(
    "error-handling",
    data=[...],
    jobs=[risky_operation],
    evaluators=[...],
)

# Error output will show: "Job 'risky-job' failed: <error details>"
# Without @job decorator, you'd only see: "<error details>"
```

#### Async Data Sources

```python
import asyncio

# Create an array of coroutines for async data
async def get_data_point(i: int) -> DataPoint:
    await asyncio.sleep(0.01)  # Simulate async data fetching
    return DataPoint(inputs={"value": i})

data_promises = [get_data_point(i) for i in range(1000)]

await evaluatorq(
    "async-eval",
    data=data_promises,
    jobs=[...],
    evaluators=[...],
)
```

#### Structured Evaluation Results

Evaluators can return structured, multi-dimensional metrics using `EvaluationResultCell`. This is useful for metrics like BERT scores, ROUGE-N scores, or any evaluation that produces multiple sub-scores.

##### Multi-criteria Rubric

Return multiple quality sub-scores in a single evaluator:

```python
from evaluatorq import evaluatorq, job, DataPoint, EvaluationResult, EvaluationResultCell

@job("echo")
async def echo_job(data: DataPoint, row: int):
    return data.inputs["text"]

async def rubric_scorer(params):
    text = str(params["output"])
    return EvaluationResult(
        value=EvaluationResultCell(
            type="rubric",
            value={
                "relevance": min(len(text) / 100, 1),
                "coherence": 0.9 if "." in text else 0.4,
                "fluency": 0.85 if len(text.split()) > 5 else 0.5,
            },
        ),
        explanation="Multi-criteria quality rubric",
    )

await evaluatorq(
    "structured-rubric",
    data=[
        DataPoint(inputs={"text": "The quick brown fox jumps over the lazy dog."}),
        DataPoint(inputs={"text": "Hi"}),
    ],
    jobs=[echo_job],
    evaluators=[{"name": "rubric", "scorer": rubric_scorer}],
)
```

##### Sentiment Distribution

Break down sentiment across categories:

```python
async def sentiment_scorer(params):
    text = str(params["output"]).lower()
    positive_words = ["good", "great", "excellent", "happy", "love"]
    negative_words = ["bad", "terrible", "awful", "sad", "hate"]
    pos_count = sum(1 for w in positive_words if w in text)
    neg_count = sum(1 for w in negative_words if w in text)
    total = max(pos_count + neg_count, 1)

    return EvaluationResult(
        value=EvaluationResultCell(
            type="sentiment",
            value={
                "positive": pos_count / total,
                "negative": neg_count / total,
                "neutral": 1 - (pos_count + neg_count) / total,
            },
        ),
        explanation="Sentiment distribution across categories",
    )
```

##### Safety Scores with Pass/Fail

Combine structured scores with pass/fail tracking for CI/CD:

```python
async def safety_scorer(params):
    text = str(params["output"]).lower()
    categories = {
        "hate_speech": 0.8 if "hate" in text else 0.1,
        "violence": 0.7 if ("kill" in text or "fight" in text) else 0.05,
        "profanity": 0.5 if "damn" in text else 0.02,
    }

    return EvaluationResult(
        value=EvaluationResultCell(
            type="safety",
            value=categories,
        ),
        pass_=all(score < 0.5 for score in categories.values()),
        explanation="Content safety severity scores per category",
    )
```

See the runnable Python examples in the `examples/` directory:

- [`structured_rubric_eval.py`](examples/lib/structured/structured_rubric_eval.py) - Multi-criteria quality rubric
- [`structured_sentiment_eval.py`](examples/lib/structured/structured_sentiment_eval.py) - Sentiment distribution breakdown
- [`structured_safety_eval.py`](examples/lib/structured/structured_safety_eval.py) - Safety scores with pass/fail tracking

> **Note:** Structured results display as `[structured]` in the terminal summary table but are preserved in full when sent to the Orq platform and OpenTelemetry spans.

#### Controlling Parallelism

```python
await evaluatorq(
    "parallel-eval",
    data=[...],
    jobs=[...],
    evaluators=[...],
    parallelism=10,  # Run up to 10 jobs concurrently
)
```

#### Dashboard Organization with `path`

Use the `path` parameter to organize evaluation results into folders on the Orq dashboard:

```python
await evaluatorq(
    "my-evaluation",
    path="MyProject/Evaluations/Unit Tests",
    data=[...],
    jobs=[...],
    evaluators=[...],
)
```

> **Tip:** Use paths like `"Team/Sprint-42/Feature-X"` to keep experiments organized across teams and sprints.

See [`path_organization.py`](examples/lib/structured/path_organization.py) for a complete example.

#### Evaluation Description

Add a description to document the purpose of each evaluation run:

```python
await evaluatorq(
    "model-comparison",
    description="Compare GPT-4o vs Claude on customer support responses",
    data=[...],
    jobs=[...],
    evaluators=[...],
)
```

#### Disable Progress Display

```python
# Get raw results without terminal output
results = await evaluatorq(
    "silent-eval",
    data=[...],
    jobs=[...],
    evaluators=[...],
    print_results=False,  # Disable progress and table display
)

# Process results programmatically
for result in results:
    print(result.data_point.inputs)
    for job_result in result.job_results:
        print(f"{job_result.job_name}: {job_result.output}")
```

## 🔧 Configuration

### Environment Variables

- `ORQ_API_KEY`: API key for Orq platform integration (required for dataset access and sending results). Also enables automatic OTEL tracing to Orq.
- `ORQ_BASE_URL`: Base URL for Orq platform (default: `https://my.orq.ai`)
- `OTEL_EXPORTER_OTLP_ENDPOINT`: Custom OpenTelemetry collector endpoint (overrides default Orq endpoint)
- `OTEL_EXPORTER_OTLP_HEADERS`: Headers for OTEL exporter (format: `key1=value1,key2=value2`)
- `ORQ_DISABLE_TRACING`: Set to `1` or `true` to disable automatic tracing
- `ORQ_DEBUG`: Enable debug logging for tracing setup

### Evaluation Parameters

Parameters are validated at runtime using Pydantic. The `evaluatorq` function supports three calling styles:

```python
from evaluatorq import evaluatorq, EvaluatorParams

# 1. Keyword arguments (recommended)
await evaluatorq(
    "my-eval",
    data=[...],
    jobs=[...],
    parallelism=5,
)

# 2. Dict style
await evaluatorq("my-eval", {
    "data": [...],
    "jobs": [...],
    "parallelism": 5,
})

# 3. EvaluatorParams instance
await evaluatorq("my-eval", EvaluatorParams(
    data=[...],
    jobs=[...],
    parallelism=5,
))
```

#### Parameter Reference

| Parameter | Type | Default | Description |
|-----------|------|---------|-------------|
| `data` | `list[DataPoint]` \| `list[Awaitable[DataPoint]]` \| `DatasetIdInput` | **required** | Data to evaluate |
| `jobs` | `list[Job]` | **required** | Jobs to run on each data point |
| `evaluators` | `list[Evaluator]` \| `None` | `None` | Evaluators to score job outputs |
| `parallelism` | `int` (≥1) | `1` | Number of concurrent jobs |
| `print_results` | `bool` | `True` | Display progress and results table |
| `description` | `str` \| `None` | `None` | Optional evaluation description |
| `path` | `str` \| `None` | `None` | Path for organizing results on the Orq dashboard (e.g., `"Project/Category"`) |

## 📊 Orq Platform Integration

### Automatic Result Sending

When the `ORQ_API_KEY` environment variable is set, evaluatorq automatically sends evaluation results to the Orq platform for visualization and analysis.

```python
# Results are automatically sent when ORQ_API_KEY is set
await evaluatorq(
    "my-evaluation",
    data=[...],
    jobs=[...],
    evaluators=[...],
)
```

#### What Gets Sent

When the `ORQ_API_KEY` is set, the following information is sent to Orq:
- Evaluation name
- Dataset ID (when using Orq datasets)
- Job results with outputs and errors
- Evaluator scores with values and explanations
- Execution timing information

Note: Evaluator explanations are included in the data sent to Orq but are not displayed in the terminal output to keep the console clean.

#### Result Visualization

After successful submission, you'll see a console message with a link to view your results:

```
📊 View your evaluation results at: <url to the evaluation>
```

The Orq platform provides:
- Interactive result tables
- Score statistics
- Performance metrics
- Historical comparisons

## 🔍 OpenTelemetry Tracing

Evaluatorq automatically creates OpenTelemetry spans for observability into your evaluation runs.

### Span Hierarchy

```
orq.job (independent root per job execution)
└── orq.evaluation (child span per evaluator)
```

### Auto-Enable with Orq

When `ORQ_API_KEY` is set, traces are automatically sent to the Orq platform:

```bash
ORQ_API_KEY=your-api-key python my_eval.py
```

### Custom OTEL Endpoint

Send traces to any OpenTelemetry-compatible backend:

```bash
OTEL_EXPORTER_OTLP_ENDPOINT=https://your-collector:4318 \
OTEL_EXPORTER_OTLP_HEADERS="Authorization=Bearer token" \
python my_eval.py
```

### Disable Tracing

If you want to disable tracing even when `ORQ_API_KEY` is set:

```bash
ORQ_DISABLE_TRACING=1 python my_eval.py
```

## ✅ Pass/Fail Tracking

Evaluators can return a `pass_` field to indicate pass/fail status:

```python
async def quality_scorer(params):
    """Quality check evaluator with pass/fail."""
    output = params["output"]
    score = calculate_quality(output)

    return {
        "value": score,
        "pass_": score >= 0.8,  # Pass if meets threshold
        "explanation": f"Quality score: {score}",
    }
```

**CI/CD Integration:** When any evaluator returns `pass_: False`, the process exits with code 1. This enables fail-fast behavior in CI/CD pipelines.

**Pass Rate Display:** The summary table shows pass rate when evaluators use the `pass_` field:

```
┌──────────────────────┬─────────────────┐
│ Pass Rate            │ 75% (3/4)       │
└──────────────────────┴─────────────────┘
```

## 🔗 LangChain Integration

Evaluatorq provides integration with LangChain and LangGraph agents, converting their outputs to the OpenResponses format for standardized evaluation.

### Overview

The LangChain integration allows you to:
- Wrap LangChain agents created with `create_agent()` for use in evaluatorq jobs
- Wrap LangGraph compiled graphs for stateful agent evaluation
- Automatically convert agent outputs to OpenResponses format
- Evaluate agent behavior using standard evaluatorq evaluators

### System Instructions

Use the `instructions` parameter to inject a system prompt into the agent. It can be a static string or a callable that builds instructions dynamically from the dataset row:

```python
from evaluatorq.integrations.langchain_integration import wrap_langchain_agent

# Static instructions
agent_job = wrap_langchain_agent(
    agent,
    name="my-agent",
    instructions="You are a helpful weather assistant.",
)

# Dynamic instructions from dataset inputs
agent_job = wrap_langchain_agent(
    agent,
    name="research-agent",
    instructions=lambda data: (
        f"Research the topic: {data.inputs['topic']}. "
        f"Focus on {data.inputs['focus']}."
    ),
)
```

### Input Modes

The wrapper reads the user input from `data.inputs` in three ways:

- **`prompt`** (default): `data.inputs["prompt"]` — a single string, sent as one user message.
- **`messages`**: `data.inputs["messages"]` — a list of `{"role": ..., "content": ...}` dicts, sent as-is.
- **Both**: when both are present, `messages` are sent first, followed by `prompt` as a final user message.

Change the prompt key with the `prompt_key` parameter (e.g., `prompt_key="question"`).

### Examples

Complete examples are available in the examples folder:

- **LangChain Agent**: [`langchain_integration_example.py`](examples/lib/integrations/langchain/langchain_integration_example.py) — Basic agent with weather tools using `wrap_langchain_agent`
- **LangGraph Agent**: [`langgraph_integration_example.py`](examples/lib/integrations/langchain/langgraph_integration_example.py) — LangGraph compiled graph with StateGraph pattern
- **LangGraph Research Agent (advanced)**: [`langgraph_research_eval.py`](examples/lib/integrations/langchain/langgraph_research_eval.py) — Dataset-driven research agent with dynamic `instructions` and multi-criteria evaluators

> **Tip:** Pass the `instructions` parameter to `wrap_langchain_agent` for dynamic system prompts — no need to write a custom job function.

## 🔴 Red Teaming External Agent Frameworks

Evaluatorq supports red teaming agents built with external frameworks. Each integration wraps your agent into a target that the red teaming pipeline can attack.

### Installation

```bash
# LangGraph
pip install evaluatorq[langgraph]

# OpenAI Agents SDK
pip install evaluatorq[openai-agents]

# All extras
pip install evaluatorq[all]
```

### LangGraph

Wrap any compiled LangGraph state graph as a red teaming target. The graph must use `MessagesState` (or a state with a `messages` key).

```python
from langgraph.prebuilt import create_react_agent
from evaluatorq.integrations.langgraph_integration import LangGraphTarget
from evaluatorq.redteam import red_team

# Create your LangGraph agent
graph = create_react_agent(model, tools=[...])

# Wrap it as a red teaming target
target = LangGraphTarget(graph)

# Run red teaming
report = await red_team(target=target)
```

Conversation state is managed via LangGraph thread IDs — each attack gets a fresh thread, and `clone()` creates independent copies for parallel attacks.

Pass extra LangGraph config (e.g., recursion limits) via the `config` parameter:

```python
target = LangGraphTarget(graph, config={"recursion_limit": 50})
```

### LangChain Agents

LangChain agents are covered by the integrations above — no separate target is needed:

- **Agents built with `create_react_agent` or `StateGraph`** (the [recommended approach](https://python.langchain.com/docs/concepts/agents/)) run on LangGraph under the hood → use `LangGraphTarget` directly.
- **Custom chains or legacy `AgentExecutor`** → wrap with `CallableTarget`:

```python
from evaluatorq.integrations.callable_integration import CallableTarget

# Any LangChain chain or AgentExecutor
async def run_chain(prompt: str) -> str:
    result = await chain.ainvoke({"input": prompt})
    return result["output"]

target = CallableTarget(run_chain)
```

### OpenAI Agents SDK

Wrap an OpenAI Agents SDK `Agent` as a red teaming target.

```python
from agents import Agent
from evaluatorq.integrations.openai_agents_integration import OpenAIAgentTarget
from evaluatorq.redteam import red_team

# Create your agent
agent = Agent(name="my-agent", instructions="You are a helpful assistant.")

# Wrap it as a red teaming target
target = OpenAIAgentTarget(agent)

# Run red teaming
report = await red_team(target=target)
```

Conversation history is managed automatically — each attack starts with a clean history, and `clone()` creates copies with empty state.

Pass extra `Runner.run()` kwargs via `run_kwargs`:

```python
target = OpenAIAgentTarget(agent, run_kwargs={"max_turns": 10})
```

### Custom Callable (Escape Hatch)

For frameworks without a dedicated integration, wrap any function that takes a prompt and returns a response:

```python
from evaluatorq.integrations.callable_integration import CallableTarget
from evaluatorq.redteam import red_team

# Async function
async def my_agent(prompt: str) -> str:
    result = await some_framework.run(prompt)
    return result.text

target = CallableTarget(my_agent)

# With state management
history = []

async def stateful_agent(prompt: str) -> str:
    history.append({"role": "user", "content": prompt})
    response = await my_llm.chat(history)
    history.append({"role": "assistant", "content": response})
    return response

target = CallableTarget(stateful_agent, reset_fn=lambda: history.clear())

report = await red_team(target=target)
```

Sync functions are also supported — they are automatically run in a thread to avoid blocking the event loop.

## 📚 API Reference

### `evaluatorq(name, params?, *, data?, jobs?, evaluators?, parallelism?, print_results?, description?) -> EvaluatorqResult`

Main async function to run evaluations.

#### Signature:

```python
async def evaluatorq(
    name: str,
    params: EvaluatorParams | dict[str, Any] | None = None,
    *,
    data: DatasetIdInput | Sequence[Awaitable[DataPoint] | DataPoint] | None = None,
    jobs: list[Job] | None = None,
    evaluators: list[Evaluator] | None = None,
    parallelism: int = 1,
    print_results: bool = True,
    description: str | None = None,
) -> EvaluatorqResult
```

#### Parameters:

- `name`: String identifier for the evaluation run
- `params`: (Optional) `EvaluatorParams` instance or dict with evaluation parameters
- `data`: List of DataPoint objects, awaitables, or `DatasetIdInput`
- `jobs`: List of job functions to run on each data point
- `evaluators`: Optional list of evaluator configurations
- `parallelism`: Number of concurrent jobs (default: 1, must be ≥1)
- `print_results`: Whether to display progress and results (default: True)
- `description`: Optional description for the evaluation run

> **Note:** Parameters can be passed either via the `params` argument (as dict or `EvaluatorParams`) or as keyword arguments. Keyword arguments take precedence over `params` values.

#### Returns:

`EvaluatorqResult` - List of `DataPointResult` objects containing job outputs and evaluator scores.

### Types

```python
from typing import Any, Callable, Awaitable
from pydantic import BaseModel, Field
from typing_extensions import TypedDict

# Output type alias
Output = str | int | float | bool | dict[str, Any] | None

class DataPoint(BaseModel):
    """A data point for evaluation."""
    inputs: dict[str, Any]
    expected_output: Output | None = None

EvaluationResultCellValue = str | float | dict[str, "str | float | dict[str, str | float]"]

class EvaluationResultCell(BaseModel):
    """Structured evaluation result with multi-dimensional metrics."""
    type: str
    value: dict[str, EvaluationResultCellValue]

class EvaluationResult(BaseModel):
    """Result from an evaluator."""
    value: str | float | bool | EvaluationResultCell
    explanation: str | None = None
    pass_: bool | None = None  # Optional pass/fail indicator for CI/CD integration

class EvaluatorScore(BaseModel):
    """Score from an evaluator for a job output."""
    evaluator_name: str
    score: EvaluationResult
    error: str | None = None

class JobResult(BaseModel):
    """Result from a job execution."""
    job_name: str
    output: Output
    error: str | None = None
    evaluator_scores: list[EvaluatorScore] | None = None

class DataPointResult(BaseModel):
    """Result for a single data point."""
    data_point: DataPoint
    error: str | None = None
    job_results: list[JobResult] | None = None

# Type aliases
EvaluatorqResult = list[DataPointResult]

class DatasetIdInput(BaseModel):
    """Input for fetching a dataset from Orq platform."""
    dataset_id: str

class EvaluatorParams(BaseModel):
    """Parameters for running an evaluation (validated at runtime)."""
    data: DatasetIdInput | Sequence[Awaitable[DataPoint] | DataPoint]
    jobs: list[Job]
    evaluators: list[Evaluator] | None = None
    parallelism: int = Field(default=1, ge=1)
    print_results: bool = True
    description: str | None = None

class JobReturn(TypedDict):
    """Job return structure."""
    name: str
    output: Output

Job = Callable[[DataPoint, int], Awaitable[JobReturn]]

class ScorerParameter(TypedDict):
    """Parameters passed to scorer functions."""
    data: DataPoint
    output: Output

Scorer = Callable[[ScorerParameter], Awaitable[EvaluationResult | dict[str, Any]]]

class Evaluator(TypedDict):
    """Evaluator configuration."""
    name: str
    scorer: Scorer

# Deployment helper types
@dataclass
class DeploymentResponse:
    """Response from a deployment invocation."""
    content: str  # Text content of the response
    raw: Any      # Raw API response

# Invoke deployment and get text content
async def invoke(
    key: str,
    inputs: dict[str, Any] | None = None,
    context: dict[str, Any] | None = None,
    metadata: dict[str, Any] | None = None,
    thread: dict[str, Any] | None = None,  # Must include 'id' key
    messages: list[dict[str, str]] | None = None,
) -> str: ...

# Invoke deployment and get full response
async def deployment(
    key: str,
    inputs: dict[str, Any] | None = None,
    context: dict[str, Any] | None = None,
    metadata: dict[str, Any] | None = None,
    thread: dict[str, Any] | None = None,  # Must include 'id' key
    messages: list[dict[str, str]] | None = None,
) -> DeploymentResponse: ...

# Built-in evaluators
def string_contains_evaluator(
    case_insensitive: bool = True,
    name: str = "string-contains",
) -> Evaluator: ...

def exact_match_evaluator(
    case_insensitive: bool = False,
    name: str = "exact-match",
) -> Evaluator: ...
```

## 🔴 Red Teaming

Evaluatorq includes a red teaming module for automated security testing of LLMs and AI agents against OWASP vulnerability categories (LLM Top 10 and Agentic Security Initiative).

> **Note:** The built-in frameworks (OWASP LLM Top 10, OWASP ASI) and their vulnerabilities, evaluators, and attack strategies are not runtime-extendable yet. Adding custom vulnerabilities currently requires modifying the package source. A runtime registration API is planned for a future release.

### Quick Start

```bash
pip install evaluatorq[redteam]

# Enable shell completion
evaluatorq --install-completion
# or
eq --install-completion
```

#### Test an LLM (OpenAI)

```python
import asyncio
from evaluatorq.redteam import TargetConfig, red_team

report = asyncio.run(red_team(
    "llm:gpt-5-mini",
    categories=["LLM01", "LLM07"],
    max_dynamic_datapoints=5,
    max_turns=2,
    target_config=TargetConfig(
        system_prompt="You are a helpful customer support assistant."
    ),
))
print(f"Resistance rate: {report.summary.resistance_rate:.0%}")
```

#### Test an ORQ agent

```python
# agent: targets auto-select the orq backend
report = asyncio.run(red_team(
    "agent:my-agent-key",
    categories=["LLM01", "ASI01", "ASI02"],
    max_dynamic_datapoints=5,
    max_turns=3,
))
```

### Modes

| Mode | Description |
|------|-------------|
| `dynamic` | Generates adversarial attacks using LLM-based strategy planning and multi-turn orchestration |
| `static` | Runs a pre-built OWASP dataset for reproducible regression testing |
| `hybrid` | Combines dynamic generation with a static dataset in a single run |

### Target Types

- **`llm:<model>`** — Test an LLM directly via OpenAI API. Provide a system prompt via `TargetConfig`.
- **`agent:<key>`** — Test an ORQ platform agent. Auto-discovers tools, memory, and system prompt.
- **`deployment:<key>`** — Test an ORQ deployment.

`agent:` and `deployment:` targets automatically use the ORQ backend.

### LLM Client Configuration

Red teaming needs an OpenAI-compatible LLM for attack generation and evaluation. `OPENAI_*` variables take priority over `ORQ_*`:

| Priority | Variables | Description |
|----------|-----------|-------------|
| 1st | `OPENAI_API_KEY` + `OPENAI_BASE_URL` (optional) | Direct OpenAI or any compatible endpoint |
| 2nd | `ORQ_API_KEY` + `ORQ_BASE_URL` (optional) | ORQ router |

Or pass a custom client: `red_team(..., llm_client=AsyncOpenAI(api_key="sk-..."))`.

### Parameters

| Parameter | Type | Default | Description |
|-----------|------|---------|-------------|
| `target` | `str \| list[str]` | **required** | Target identifier(s) |
| `mode` | `str` | `"dynamic"` | `"dynamic"`, `"static"`, or `"hybrid"` |
| `categories` | `list[str] \| None` | all | OWASP categories (e.g. `["ASI01", "LLM07"]`) |
| `max_turns` | `int` | `5` | Max conversation turns per attack |
| `max_dynamic_datapoints` | `int \| None` | `None` | Cap generated attack datapoints |
| `attack_model` | `str` | `"gpt-5-mini"` | Model for adversarial prompt generation |
| `evaluator_model` | `str` | `"gpt-5-mini"` | Model for evaluation scoring |
| `parallelism` | `int` | `5` | Max concurrent jobs |
| `name` | `str \| None` | `"red-team"` | Experiment name |
| `backend` | `str` | `"openai"` | `"openai"` or `"orq"` (auto-detected for agent targets) |
| `llm_client` | `AsyncOpenAI \| None` | `None` | Custom LLM client |
| `dataset_path` | `str \| None` | `None` | Path to local static dataset |

### CLI

```bash
# Show all options
eq redteam run --help

# Test an LLM with a system prompt
eq redteam run -t "llm:gpt-5-mini" \
  --system-prompt "You are a helpful assistant." \
  -c LLM01 -c LLM07 --max-turns 2 --max-dynamic-datapoints 5 -y

# Test an ORQ agent
eq redteam run -t "agent:my-agent-key" \
  -c ASI01 -c LLM07 --max-turns 3 -y

# Multi-target comparison
eq redteam run -t "llm:gpt-5-mini" -t "llm:gpt-4o" \
  -c LLM07 --max-turns 2 --max-dynamic-datapoints 3 -y

# Export reports
eq redteam run -t "llm:gpt-5-mini" \
  --save-report report.json --export-md ./reports --export-html ./reports -y

# List previous runs
eq redteam runs
```

See [examples/redteam/](examples/redteam/) for complete Python examples covering both backends.

## 🛠️ Development

```bash
# Install dependencies
uv sync

# Run type checking
uv run basedpyright

# Format code
uv run ruff format

# Lint code
uv run ruff check
```
