Metadata-Version: 2.4
Name: airflow-turbine
Version: 0.5.12
Summary: Airflow operator for Turbine data quality checks
Requires-Python: >=3.12
Requires-Dist: apache-airflow>=3.0
Requires-Dist: turbine-client
Provides-Extra: dev
Requires-Dist: pytest>=8.0; extra == 'dev'
Requires-Dist: respx>=0.22; extra == 'dev'
Description-Content-Type: text/markdown

# airflow-turbine

[Apache Airflow](https://airflow.apache.org/) operator for [Turbine](https://pypi.org/project/turbine-data/). Drop `TurbineCheckOperator` into a DAG, point it at a Turbine server with an Airflow Connection, and your data-quality checks run as native Airflow tasks. The operator is deferrable — it releases the worker slot while Turbine evaluates, then resumes to log every Check Outcome and fail the task on any breach.

## Install

```bash
uv add airflow-turbine
```

Python 3.12 or newer. Pulls only `apache-airflow` and `turbine-client` — the Turbine engine never enters your Airflow environment.

## Minimal example

```python
from airflow_turbine import TurbineCheckOperator

quality = TurbineCheckOperator(
    task_id="quality",
    turbine_conn_id="turbine",
)
```

The Connection's `host` / `port` / `schema` build the base URL; `password` (when set) becomes a static bearer token. With no other configuration the operator runs every contract registered with the server.

## Contract Selection

```python
TurbineCheckOperator(
    task_id="quality",
    turbine_conn_id="turbine",
    contracts=["orders", "customers"],   # list, single id, or None to fan out across every contract
)
```

`contracts=None` (the default) fans out across every registered contract; `contracts=[]` is a no-op that succeeds without running anything (useful when the list is templated from an upstream task).

## Check Window from the DAG's data interval

Opt in to forward `data_interval_start` / `data_interval_end` as the Check Window:

```python
TurbineCheckOperator(
    task_id="quality",
    turbine_conn_id="turbine",
    use_data_interval=True,
)
```

Off by default — every scheduled DAG has a data interval, and forwarding it unconditionally would spray warnings on contracts whose SLAs do not declare a latency element. Setting `use_data_interval=True` together with `incremental=True` is rejected at task construction.

## Run options

```python
TurbineCheckOperator(
    task_id="quality",
    turbine_conn_id="turbine",
    incremental=True,    # check rows newer than the last watermark; mutually exclusive with use_data_interval
    flag_rows=True,      # persist failing-row PKs to the flag matrix
)
```

## Auth

Three modes, picked from the Connection:

| Connection shape | Resolved auth |
|---|---|
| `password` set, no OAuth `extra` | `BearerAuth(token=password)` |
| OAuth fields in `extra` (`tenant_id`, `client_id`, `client_secret`, `scope`) | `AzureADClientCredentials(...)` |
| No `password`, no OAuth `extra` | `None` (in-cluster, unauthenticated) |

Override the Connection-derived auth explicitly:

```python
from turbine_client import BearerAuth

TurbineCheckOperator(
    task_id="quality",
    turbine_conn_id="turbine",
    auth=BearerAuth(token="explicit"),
)
```

Any explicit `auth=...` requires `turbine_conn_id` to be set — the trigger re-reads the connection on the triggerer process to rebuild the auth without carrying credentials across the deferral boundary.

## Behaviour on failure

Each Check Outcome is logged on resume. The task raises `RuntimeError` when:

- The Check Run finishes with `RunStatus.FAILED`.
- Any Check Outcome is `FAILED` or `ERRORED`.

`WARNED` outcomes log at WARNING level but do **not** fail the task — advisory checks (`severity: warn` in the Quality Spec) surface as a soft signal.

## Compatibility

`airflow-turbine >= 0.5.12` requires `turbine-data >= 0.5.12` on the server.
