Metadata-Version: 2.4
Name: arize-ax-airflow-provider
Version: 1.3.0
Summary: Airflow provider for Arize AX: operators and hooks for datasets, experiments, projects, spans, and ML.
Author: Arize AX Airflow Provider
License-Expression: Apache-2.0
Project-URL: Homepage, https://arize.com/docs/ax/integrations/orchestration/airflow/airflow-provider
Project-URL: Documentation, https://arize.com/docs/ax/integrations/orchestration/airflow/airflow-provider
Project-URL: Operator Reference, https://arize.com/docs/ax/integrations/orchestration/airflow/airflow-operators
Project-URL: Arize AX, https://arize.com/docs/ax/
Project-URL: Support, https://arize.com/support/
Keywords: airflow,arize,arize-ax,provider,operators,hooks
Classifier: Development Status :: 5 - Production/Stable
Classifier: Framework :: Apache Airflow :: Provider
Classifier: Intended Audience :: Developers
Classifier: Programming Language :: Python :: 3
Classifier: Programming Language :: Python :: 3.10
Classifier: Programming Language :: Python :: 3.11
Classifier: Programming Language :: Python :: 3.12
Requires-Python: >=3.10
Description-Content-Type: text/markdown
License-File: LICENSE
Requires-Dist: apache-airflow>=2.4.0
Requires-Dist: arize==8.27.0
Requires-Dist: requests>=2.20
Provides-Extra: dev
Requires-Dist: pytest>=7.0; extra == "dev"
Requires-Dist: pytest-cov>=4.0; extra == "dev"
Requires-Dist: pytest-mock>=3.10; extra == "dev"
Requires-Dist: mypy>=1.0; extra == "dev"
Requires-Dist: ruff>=0.1; extra == "dev"
Provides-Extra: system
Requires-Dist: pytest>=7.0; extra == "system"
Requires-Dist: pytest-mock>=3.10; extra == "system"
Requires-Dist: arize==8.27.0; extra == "system"
Dynamic: license-file

# Arize AX Airflow Provider


[![PyPI](https://img.shields.io/pypi/v/arize-ax-airflow-provider.svg)](https://pypi.org/project/arize-ax-airflow-provider/)
[![Python Versions](https://img.shields.io/pypi/pyversions/arize-ax-airflow-provider.svg)](https://pypi.org/project/arize-ax-airflow-provider/)
[![License](https://img.shields.io/badge/License-Apache_2.0-blue.svg)](https://opensource.org/licenses/Apache-2.0)


The official Apache Airflow provider for **[Arize AX](https://arize.com/docs/ax/)** — schedule, automate, and orchestrate your LLMOps workflows directly from Airflow.

Build DAGs that evaluate prompts continuously, compare experiments before deploying, detect drift, curate datasets from production traces, and gate releases on evaluation scores, all using purpose-built operators that wrap the Arize AX platform.

## Features

- **97 operators** across 12 domains: datasets, experiments, projects, spans, evaluators, prompts, tasks, annotations, AI integrations, API keys, spaces, ML
- **8 sensors** for waiting on dataset readiness, experiment completion, span ingestion, evaluation scores, task runs, and more
- **Built-in gates** `fail_on_regression=True`, `fail_on_drift=True`, `min_score` thresholds raise `AirflowException` directly, no glue code needed
- **Idempotent operations**  `if_exists="skip"` on creates, `ignore_if_missing=True` on deletes
- **Clean XCom values** operators return scalar IDs by default, full results available via named XCom keys
- **Continuous evaluation tasks** with Eval Hub for live production monitoring
- **Human-in-the-loop annotations** through annotation queues
- **20 example DAGs** covering CI/CD gates, drift detection, prompt lifecycle, RAG evaluation, fine-tuning data pipelines, and a self-contained self-optimizing loop demo

## Installation

```bash
pip install arize-ax-airflow-provider
```

Requires **Python 3.10+**, **Apache Airflow 2.4+**, and **Arize SDK 8.27.0** (the
provider exact-pins ``arize==8.27.0`` so installs only use the version we test
against; bump alongside the provider release when adopting a newer SDK).

## Setup

1. In Airflow UI, go to **Admin → Connections → Add**
2. Set **Connection Id** to `arize_ax_default`
3. Set **Connection Type** to `arize_ax`
4. Set **Password** to your Arize API key
5. (Optional) Set **Extra** to `{"space_id": "your-space-id"}` to use it as the default space

## Quick start

A server-side CI/CD gate: pull a candidate prompt from Prompt Hub, have **Arize Eval Hub** run it against an evaluation dataset (no LLM SDK or API key on the Airflow worker), compare against the production baseline, and promote on pass. Requires four Airflow Variables: `arize_ax_prompt_name`, `arize_ax_eval_dataset_id`, `arize_ax_baseline_experiment_id`, `arize_ax_ai_integration_id`.

```python
from datetime import datetime
from typing import Any

from airflow import DAG
from airflow.models import Variable
from airflow.providers.standard.operators.python import PythonOperator

from airflow.providers.arize_ax.operators.experiments import (
    ArizeAxCompareExperimentsOperator,
)
from airflow.providers.arize_ax.operators.prompts import (
    ArizeAxGetPromptOperator,
    ArizeAxPromotePromptOperator,
)
from airflow.providers.arize_ax.operators.tasks import (
    ArizeAxCreateRunExperimentTaskOperator,
    ArizeAxGetTaskRunOperator,
    ArizeAxTriggerTaskRunOperator,
)
from airflow.providers.arize_ax.sensors.arize_ax import ArizeAxTaskRunSensor


def build_run_config_from_prompt(**ctx) -> dict[str, Any]:
    """Materialize a Prompt Hub prompt version into a server-side run config."""
    prompt = ctx["ti"].xcom_pull(task_ids="fetch_candidate_prompt")
    version = prompt["version"]
    return {
        "experiment_type": "llm_generation",
        "ai_integration_id": Variable.get("arize_ax_ai_integration_id"),
        "model_name": version["model"],
        "messages": version["messages"],
        "input_variable_format": version["input_variable_format"],
        "invocation_parameters": version.get("invocation_params") or {},
        "provider_parameters": version.get("provider_params") or {},
    }


with DAG(
    dag_id="llm_cicd_gate",
    start_date=datetime(2026, 1, 1),
    schedule="@daily",
    catchup=False,
    render_template_as_native_obj=True,  # required: dict XCom must stay a dict
) as dag:

    fetch_candidate_prompt = ArizeAxGetPromptOperator(
        task_id="fetch_candidate_prompt",
        prompt_name="{{ var.value.arize_ax_prompt_name }}",
        version_label="staging",
    )

    build_run_config = PythonOperator(
        task_id="build_run_config",
        python_callable=build_run_config_from_prompt,
    )

    create_task = ArizeAxCreateRunExperimentTaskOperator(
        task_id="create_candidate_task",
        name="candidate-{{ ds_nodash }}",
        dataset_id="{{ var.value.arize_ax_eval_dataset_id }}",
        run_configuration="{{ ti.xcom_pull(task_ids='build_run_config') }}",
        if_exists="skip",
    )

    trigger_run = ArizeAxTriggerTaskRunOperator(
        task_id="trigger_candidate_run",
        task_id_param="{{ ti.xcom_pull(task_ids='create_candidate_task') }}",
        experiment_name="candidate-{{ ds_nodash }}",
    )

    wait_for_run = ArizeAxTaskRunSensor(
        task_id="wait_for_candidate_run",
        run_id="{{ ti.xcom_pull(task_ids='trigger_candidate_run') }}",
        poke_interval=15,
        timeout=900,
        mode="reschedule",
    )

    get_result = ArizeAxGetTaskRunOperator(
        task_id="get_candidate_result",
        run_id="{{ ti.xcom_pull(task_ids='trigger_candidate_run') }}",
    )

    compare = ArizeAxCompareExperimentsOperator(
        task_id="compare_to_baseline",
        baseline_experiment_id="{{ var.value.arize_ax_baseline_experiment_id }}",
        candidate_experiment_id="{{ ti.xcom_pull(task_ids='get_candidate_result')['experiment_id'] }}",
        metric_names=["accuracy"],
        pass_threshold=0.0,
        fail_on_regression=True,
    )

    promote = ArizeAxPromotePromptOperator(
        task_id="promote_to_production",
        prompt_name="{{ var.value.arize_ax_prompt_name }}",
        label="production",
    )

    fetch_candidate_prompt >> build_run_config >> create_task >> trigger_run
    trigger_run >> wait_for_run >> get_result >> compare >> promote
```

That's a complete server-side CI/CD gate. The `fail_on_regression=True` flag means `compare` raises `AirflowException` when the candidate underperforms the baseline, so `promote` never runs. The same `arize_ax_prompt_name` Variable threads through `fetch` (at the `staging` label) and `promote` (to the `production` label), the gate is the only path that re-labels production.

> **Prefer running the LLM call yourself?** Swap the server-side stack (Get/Create/Trigger/Wait/Get) for a single [`ArizeAxRunExperimentOperator`](https://github.com/Arize-ai/arize-ax-airflow/blob/main/provider_pkg/operators/experiments.py) with a `task=` callable. See [`example_arize_ax_llm_cicd_gate_dag.py`](https://github.com/Arize-ai/arize-ax-airflow/blob/main/provider_pkg/example_dags/example_arize_ax_llm_cicd_gate_dag.py) for the client-side variant.

## Operator domains

| Domain | Operators |
|--------|-----------|
| **Datasets** | List, create, get, delete, list/append/annotate examples, export to file, health check, smart refresh |
| **Experiments** | List, create, get, delete, run, list/annotate runs, get score, compare, detect drift, calibration, behavioral regression, budget allocator |
| **Projects** | List, create, get, delete |
| **Spans** | List, log, update evaluations/annotations/metadata, export to DataFrame/Parquet, get metrics, curate to dataset, **curate feedback dataset (self-learning agents)**, export annotated, export to fine-tuning, adaptive sampling |
| **Evaluators** | List, create (template or **code**), get, update, delete, list/get/add versions (template or **code**) |
| **Prompts** | List, create, get, delete, compare, promote, **optimize (meta-prompt via Prompt Learning SDK)** |
| **Tasks** | List, create (evaluation or **run-experiment**), get, **update**, **delete**, list runs, get run, trigger run, cancel run |
| **Annotations** | List/create/delete configs, list/get/create/update/delete queues, list/add/**delete** records, annotate, assign |
| **AI Integrations** | List, get, create, update, delete |
| **API Keys** | List, create, delete, refresh |
| **Spaces** | List, get, create, update, delete |
| **ML** | Log batch/stream, export to DataFrame/Parquet |

## Sensors

| Sensor | Purpose |
|--------|---------|
| `ArizeAxExperimentCompleteSensor` | Wait until an experiment reaches a terminal state |
| `ArizeAxDatasetReadySensor` | Wait until a dataset has at least N examples |
| `ArizeAxSpanCountSensor` | Wait until span count in a project exceeds a threshold |
| `ArizeAxEvaluationScoreSensor` | Wait until evaluation score crosses a threshold |
| `ArizeAxExperimentRunCountSensor` | Wait until experiment has N runs |
| `ArizeAxSpanIngestionSensor` | Wait until span ingestion stabilizes |
| `ArizeAxAnnotationQueueSensor` | Wait until annotation queue is configured |
| `ArizeAxTaskRunSensor` | Wait until a task run reaches a terminal state |

## Design patterns

The provider follows established Airflow operator conventions so DAGs read naturally and stay maintainable:

- **Idempotent creates** Set `if_exists="skip"` to handle 409 conflicts by resolving the existing resource by name
- **Idempotent deletes** All Delete operators accept `ignore_if_missing=True` (default), logging on 404 instead of raising
- **Built-in gates** Comparison operators (`CompareExperiments`, `DetectEvalDrift`, `EvaluatorCalibration`, `BehavioralRegression`, `GetExperimentScore`) accept `fail_on_*` / `min_score` params that raise `AirflowException` on failure
- **Param validation** Operators validate required `space_id` / `project_id` in `execute()` with clear error messages
- **Convenience XCom keys** List operators push `first_id` and `first_name` for direct chaining via Jinja templates
- **Override evaluations** `ArizeAxTriggerTaskRunOperator(override_evaluations=True)` re-evaluates spans that already have labels

## Example DAGs

Bundled in `provider_pkg/example_dags/`:

| Pattern | DAG |
|---------|-----|
| Self-contained smoke test | `example_arize_ax_e2e_dag.py` |
| LLM CI/CD gate | `example_arize_ax_llm_cicd_gate_dag.py` |
| Prompt lifecycle (staging → production) | `example_arize_ax_prompt_lifecycle_dag.py` |
| Prompt A/B testing | `example_arize_ax_prompt_ab_test_dag.py` |
| Drift detection with auto-rollback | `example_arize_ax_drift_detection_dag.py` |
| Behavioral regression detection | `example_arize_ax_behavioral_regression_dag.py` |
| Evaluator calibration vs human labels | `example_arize_ax_evaluator_calibration_dag.py` |
| RAG evaluation pipeline | `example_arize_ax_rag_evaluation_dag.py` |
| Production span curation into datasets | `example_arize_ax_dataset_curation_dag.py` |
| Fine-tuning data pipeline | `example_arize_ax_finetune_data_pipeline_dag.py` |
| Continuous evaluation tasks | `example_arize_ax_tasks_dag.py` |
| Annotation queues for HITL | `example_arize_ax_annotation_queues_dag.py` |
| Multi-model experiment matrix | `example_arize_ax_llm_experiments_dag.py` |
| Self-learning agent (multi-prompt optimization from production feedback) | `example_arize_ax_prompt_optimization_with_feedback_dag.py` |
| Self-optimizing loop (self-contained closed loop: baseline → optimize → candidate → gate → promote) | `example_arize_ax_self_optimizing_loop_dag.py` |

Plus dataset, experiment, evaluator, span, project, ML, and admin demos for individual domain walkthroughs.

> The self-learning agent demo requires the `prompt-learning-enhanced` SDK
> (available as a git source only, PyPI does not accept direct-URL deps,
> so it is **not** declared as an optional extra here). Install it
> separately on workers that run the optimization DAG:
>
> ```
> pip install 'arize-phoenix-evals>=2.0,<3.0' \
>             'prompt-learning-enhanced @ git+https://github.com/Arize-ai/prompt-learning.git'
> ```
>
> The `arize-phoenix-evals<3.0` pin is required because upstream
> `prompt-learning-enhanced` imports `phoenix.evals.models`, which was
> removed in 3.0.0. `ArizeAxOptimizePromptOperator` raises a clear
> `AirflowException` with this exact install line when the SDK is missing.

## Documentation

- **Provider guide:** [Arize AX Airflow Provider](https://arize.com/docs/ax/integrations/orchestration/airflow/airflow-provider) — install, setup, design patterns, example DAGs
- **Operator reference:** [Operators, sensors, and hooks](https://arize.com/docs/ax/integrations/orchestration/airflow/airflow-operators) — full reference for all 97 operators
- **Arize AX docs:** [arize.com/docs/ax](https://arize.com/docs/ax/)
- **Arize SDK reference:** [Arize Python SDK v8](https://arize.com/docs/api-clients/python/version-8/overview)

## Support

- **Help & support:** [arize.com/support](https://arize.com/support/)
- **Slack:** [Arize Community Slack](https://arize-ai.slack.com/)
- **Twitter/X:** [@ArizeAI](https://twitter.com/ArizeAI)
