Metadata-Version: 2.4
Name: arize-ax-airflow-provider
Version: 1.2.0
Summary: Airflow provider for Arize AX: operators and hooks for datasets, experiments, projects, spans, and ML.
Author: Arize AX Airflow Provider
License-Expression: Apache-2.0
Project-URL: Homepage, https://arize.com/docs/ax/integrations/orchestration/airflow/airflow-provider
Project-URL: Documentation, https://arize.com/docs/ax/integrations/orchestration/airflow/airflow-provider
Project-URL: Operator Reference, https://arize.com/docs/ax/integrations/orchestration/airflow/airflow-operators
Project-URL: Arize AX, https://arize.com/docs/ax/
Project-URL: Support, https://arize.com/support/
Keywords: airflow,arize,arize-ax,provider,operators,hooks
Classifier: Development Status :: 5 - Production/Stable
Classifier: Framework :: Apache Airflow :: Provider
Classifier: Intended Audience :: Developers
Classifier: Programming Language :: Python :: 3
Classifier: Programming Language :: Python :: 3.10
Classifier: Programming Language :: Python :: 3.11
Classifier: Programming Language :: Python :: 3.12
Requires-Python: >=3.10
Description-Content-Type: text/markdown
License-File: LICENSE
Requires-Dist: apache-airflow>=2.4.0
Requires-Dist: arize==8.27.0
Requires-Dist: requests>=2.20
Provides-Extra: dev
Requires-Dist: pytest>=7.0; extra == "dev"
Requires-Dist: pytest-cov>=4.0; extra == "dev"
Requires-Dist: pytest-mock>=3.10; extra == "dev"
Requires-Dist: mypy>=1.0; extra == "dev"
Requires-Dist: ruff>=0.1; extra == "dev"
Provides-Extra: system
Requires-Dist: pytest>=7.0; extra == "system"
Requires-Dist: pytest-mock>=3.10; extra == "system"
Requires-Dist: arize==8.27.0; extra == "system"
Dynamic: license-file

# Arize AX Airflow Provider

[![CI](https://github.com/Arize-ai/arize-ax-airflow/actions/workflows/ci.yml/badge.svg)](https://github.com/Arize-ai/arize-ax-airflow/actions/workflows/ci.yml)
[![codecov](https://codecov.io/gh/Arize-ai/arize-ax-airflow/graph/badge.svg)](https://codecov.io/gh/Arize-ai/arize-ax-airflow)
[![PyPI](https://img.shields.io/pypi/v/arize-ax-airflow-provider.svg)](https://pypi.org/project/arize-ax-airflow-provider/)
[![Python Versions](https://img.shields.io/pypi/pyversions/arize-ax-airflow-provider.svg)](https://pypi.org/project/arize-ax-airflow-provider/)
[![License](https://img.shields.io/badge/License-Apache_2.0-blue.svg)](https://opensource.org/licenses/Apache-2.0)

The official Apache Airflow provider for **[Arize AX](https://arize.com/docs/ax/)** — schedule, automate, and orchestrate your LLMOps workflows directly from Airflow.

Build DAGs that evaluate prompts continuously, compare experiments before deploying, detect drift, curate datasets from production traces, and gate releases on evaluation scores — all using purpose-built operators that wrap the Arize AX platform.

## Features

- **97 operators** across 12 domains: datasets, experiments, projects, spans, evaluators, prompts, tasks, annotations, AI integrations, API keys, spaces, ML
- **8 sensors** for waiting on dataset readiness, experiment completion, span ingestion, evaluation scores, task runs, and more
- **Built-in gates** — `fail_on_regression=True`, `fail_on_drift=True`, `min_score` thresholds raise `AirflowException` directly, no glue code needed
- **Idempotent operations** — `if_exists="skip"` on creates, `ignore_if_missing=True` on deletes
- **Clean XCom values** — operators return scalar IDs by default, full results available via named XCom keys
- **Continuous evaluation tasks** with Eval Hub for live production monitoring
- **Human-in-the-loop annotations** through annotation queues
- **19 example DAGs** covering CI/CD gates, drift detection, prompt lifecycle, RAG evaluation, and fine-tuning data pipelines

## Installation

```bash
pip install arize-ax-airflow-provider
```

Requires **Python 3.10+**, **Apache Airflow 2.4+**, and **Arize SDK 8.27.0** (the
provider exact-pins ``arize==8.27.0`` so installs only use the version we test
against; bump alongside the provider release when adopting a newer SDK).

## Setup

1. In Airflow UI, go to **Admin → Connections → Add**
2. Set **Connection Id** to `arize_ax_default`
3. Set **Connection Type** to `arize_ax`
4. Set **Password** to your Arize API key
5. (Optional) Set **Extra** to `{"space_id": "your-space-id"}` to use it as the default space

## Quick start

```python
from datetime import datetime
from airflow import DAG
from airflow.providers.arize_ax.operators.experiments import (
    ArizeAxRunExperimentOperator,
    ArizeAxCompareExperimentsOperator,
)
from airflow.providers.arize_ax.operators.prompts import (
    ArizeAxPromotePromptOperator,
)

with DAG(
    dag_id="llm_cicd_gate",
    start_date=datetime(2025, 1, 1),
    schedule="@daily",
    catchup=False,
) as dag:
    run = ArizeAxRunExperimentOperator(
        task_id="run_candidate",
        dataset_id="{{ var.value.eval_dataset_id }}",
        name="candidate-{{ ds_nodash }}",
    )

    compare = ArizeAxCompareExperimentsOperator(
        task_id="compare_to_baseline",
        baseline_experiment_id="{{ var.value.baseline_id }}",
        candidate_experiment_id="{{ ti.xcom_pull(task_ids='run_candidate') }}",
        fail_on_regression=True,   # raises if candidate scores worse
    )

    promote = ArizeAxPromotePromptOperator(
        task_id="promote",
        prompt_id="{{ var.value.prompt_id }}",
        label="production",
    )

    run >> compare >> promote
```

That's a complete CI/CD gate. The `fail_on_regression=True` flag means `compare` raises `AirflowException` when the candidate underperforms the baseline — `promote` never runs. No `ShortCircuitOperator`, no `PythonOperator` glue.

## Operator domains

| Domain | Operators |
|--------|-----------|
| **Datasets** | List, create, get, delete, list/append/annotate examples, export to file, health check, smart refresh |
| **Experiments** | List, create, get, delete, run, list/annotate runs, get score, compare, detect drift, calibration, behavioral regression, budget allocator |
| **Projects** | List, create, get, delete |
| **Spans** | List, log, update evaluations/annotations/metadata, export to DataFrame/Parquet, get metrics, curate to dataset, **curate feedback dataset (self-learning agents)**, export annotated, export to fine-tuning, adaptive sampling |
| **Evaluators** | List, create (template or **code**), get, update, delete, list/get/add versions (template or **code**) |
| **Prompts** | List, create, get, delete, compare, promote, **optimize (meta-prompt via Prompt Learning SDK)** |
| **Tasks** | List, create (evaluation or **run-experiment**), get, **update**, **delete**, list runs, get run, trigger run, cancel run |
| **Annotations** | List/create/delete configs, list/get/create/update/delete queues, list/add/**delete** records, annotate, assign |
| **AI Integrations** | List, get, create, update, delete |
| **API Keys** | List, create, delete, refresh |
| **Spaces** | List, get, create, update, delete |
| **ML** | Log batch/stream, export to DataFrame/Parquet |

## Sensors

| Sensor | Purpose |
|--------|---------|
| `ArizeAxExperimentCompleteSensor` | Wait until an experiment reaches a terminal state |
| `ArizeAxDatasetReadySensor` | Wait until a dataset has at least N examples |
| `ArizeAxSpanCountSensor` | Wait until span count in a project exceeds a threshold |
| `ArizeAxEvaluationScoreSensor` | Wait until evaluation score crosses a threshold |
| `ArizeAxExperimentRunCountSensor` | Wait until experiment has N runs |
| `ArizeAxSpanIngestionSensor` | Wait until span ingestion stabilizes |
| `ArizeAxAnnotationQueueSensor` | Wait until annotation queue is configured |
| `ArizeAxTaskRunSensor` | Wait until a task run reaches a terminal state |

## Design patterns

The provider follows established Airflow operator conventions so DAGs read naturally and stay maintainable:

- **Idempotent creates** — Set `if_exists="skip"` to handle 409 conflicts by resolving the existing resource by name
- **Idempotent deletes** — All Delete operators accept `ignore_if_missing=True` (default), logging on 404 instead of raising
- **Built-in gates** — Comparison operators (`CompareExperiments`, `DetectEvalDrift`, `EvaluatorCalibration`, `BehavioralRegression`, `GetExperimentScore`) accept `fail_on_*` / `min_score` params that raise `AirflowException` on failure
- **Param validation** — Operators validate required `space_id` / `project_id` in `execute()` with clear error messages
- **Convenience XCom keys** — List operators push `first_id` and `first_name` for direct chaining via Jinja templates
- **Override evaluations** — `ArizeAxTriggerTaskRunOperator(override_evaluations=True)` re-evaluates spans that already have labels

## Example DAGs

Bundled in `provider_pkg/example_dags/`:

| Pattern | DAG |
|---------|-----|
| Self-contained smoke test | `example_arize_ax_e2e_dag.py` |
| LLM CI/CD gate | `example_arize_ax_llm_cicd_gate_dag.py` |
| Prompt lifecycle (staging → production) | `example_arize_ax_prompt_lifecycle_dag.py` |
| Prompt A/B testing | `example_arize_ax_prompt_ab_test_dag.py` |
| Drift detection with auto-rollback | `example_arize_ax_drift_detection_dag.py` |
| Behavioral regression detection | `example_arize_ax_behavioral_regression_dag.py` |
| Evaluator calibration vs human labels | `example_arize_ax_evaluator_calibration_dag.py` |
| RAG evaluation pipeline | `example_arize_ax_rag_evaluation_dag.py` |
| Production span curation into datasets | `example_arize_ax_dataset_curation_dag.py` |
| Fine-tuning data pipeline | `example_arize_ax_finetune_data_pipeline_dag.py` |
| Continuous evaluation tasks | `example_arize_ax_tasks_dag.py` |
| Annotation queues for HITL | `example_arize_ax_annotation_queues_dag.py` |
| Multi-model experiment matrix | `example_arize_ax_llm_experiments_dag.py` |
| Self-learning agent (multi-prompt optimization from production feedback) | `example_arize_ax_prompt_optimization_with_feedback_dag.py` |

Plus dataset, experiment, evaluator, span, project, ML, and admin demos for individual domain walkthroughs.

> The self-learning agent demo requires the `prompt-learning-enhanced` SDK
> (available as a git source only — PyPI does not accept direct-URL deps,
> so it is **not** declared as an optional extra here). Install it
> separately on workers that run the optimization DAG:
>
> ```
> pip install 'arize-phoenix-evals>=2.0,<3.0' \
>             'prompt-learning-enhanced @ git+https://github.com/Arize-ai/prompt-learning.git'
> ```
>
> The `arize-phoenix-evals<3.0` pin is required because upstream
> `prompt-learning-enhanced` imports `phoenix.evals.models`, which was
> removed in 3.0.0. `ArizeAxOptimizePromptOperator` raises a clear
> `AirflowException` with this exact install line when the SDK is missing.

## Documentation

- **Provider guide:** [Arize AX Airflow Provider](https://arize.com/docs/ax/integrations/orchestration/airflow/airflow-provider) — install, setup, design patterns, example DAGs
- **Operator reference:** [Operators, sensors, and hooks](https://arize.com/docs/ax/integrations/orchestration/airflow/airflow-operators) — full reference for all 97 operators
- **Arize AX docs:** [arize.com/docs/ax](https://arize.com/docs/ax/)
- **Arize SDK reference:** [Arize Python SDK v8](https://arize.com/docs/api-clients/python/version-8/overview)

## Support

- **Help & support:** [arize.com/support](https://arize.com/support/)
- **Slack:** [Arize Community Slack](https://arize-ai.slack.com/)
- **Twitter/X:** [@ArizeAI](https://twitter.com/ArizeAI)

## License

[Apache 2.0](https://github.com/Arize-ai/arize-ax-airflow/blob/main/LICENSE)
