Metadata-Version: 2.4
Name: dagster-turbine
Version: 0.5.13
Summary: Dagster integration for Turbine data quality checks
Requires-Python: >=3.12
Requires-Dist: dagster>=1.12.0
Requires-Dist: turbine-api-client
Provides-Extra: dev
Requires-Dist: pytest>=8.0; extra == 'dev'
Description-Content-Type: text/markdown

# dagster-turbine

[Dagster](https://docs.dagster.io/) integration for [Turbine](https://pypi.org/project/turbine-data/). Surfaces every Check Definition the Turbine server knows about as a native Dagster `AssetCheckSpec`, executes each one against the running Turbine HTTP API, and publishes the same authenticated `TurbineClient` as a Dagster resource for your own assets.

Two entry points ship in the package:

- **`TurbineChecksComponent`** — declarative Component; configure once in YAML, get asset checks for free.
- **`TurbineServerResource`** — injectable resource for custom assets/checks that want to call Turbine directly.

## Install

```bash
uv add dagster-turbine
```

Python 3.12 or newer. Pulls only `dagster` and `turbine-client` — the Turbine engine never enters your Dagster environment.

## Minimal YAML (declarative)

```yaml
# defs/quality/defs.yaml
type: dagster_turbine.TurbineChecksComponent
attributes:
  base_url: http://turbine:8000
  auth: { type: bearer, token: ${TURBINE_TOKEN} }
```

Reload your code location and every contract on the server appears as a `multi_asset_check` per table, one `AssetCheckSpec` per registered Turbine check.

## Contract Selection

Narrow the discovery to a subset of contracts. Omit `contracts` (or set it to `null`) to cover every contract — the zero-config default.

```yaml
type: dagster_turbine.TurbineChecksComponent
attributes:
  base_url: http://turbine:8000
  contracts: [orders, customers]
  auth: { type: bearer, token: ${TURBINE_TOKEN} }
```

## Run options

```yaml
type: dagster_turbine.TurbineChecksComponent
attributes:
  base_url: http://turbine:8000
  run:
    incremental: false   # rows newer than the last watermark; mutually exclusive with partition windows
    flag_rows: true      # persist failing-row PKs to the flag matrix
```

Partitioned assets automatically forward their partition window as the Check Window's `since` / `until`. Combining a partitioned asset with `incremental: true` is rejected at execute time — pick the data interval *or* the watermark.

## Blocking (severity-driven)

Each generated `AssetCheckSpec.blocking` defaults to whether the underlying Check Definition's `severity` is `error` (the gating default). Advisory checks (`severity: warn`, authored in the Quality Spec) become non-blocking automatically.

Override per dimension when the topology demands it:

```yaml
type: dagster_turbine.TurbineChecksComponent
attributes:
  base_url: http://turbine:8000
  blocking_dimensions: [validity]   # any check in this dimension blocks regardless of severity
```

## Auth

Three modes, picked via the `auth` discriminator:

```yaml
auth: { type: none }                                # in-cluster, unauthenticated
auth: { type: bearer, token: ${TURBINE_TOKEN} }     # static bearer token
auth:
  type: azure_ad                                    # OAuth client-credentials behind PAX
  tenant_id: ${AZ_TENANT_ID}
  client_id: ${AZ_CLIENT_ID}
  client_secret: ${AZ_CLIENT_SECRET}
  scope: api://turbine/.default
```

## TurbineServerResource

Inject the same configured `TurbineClient` into your own assets:

```python
import dagster as dg
from dagster_turbine import TurbineServerResource, BearerAuthConfig

@dg.asset
def turbine_contracts(turbine: TurbineServerResource):
    return turbine.get_client().list_contracts()

defs = dg.Definitions(
    assets=[turbine_contracts],
    resources={
        "turbine": TurbineServerResource(
            base_url="http://turbine:8000",
            auth=BearerAuthConfig(token="..."),
        ),
    },
)
```

## Compatibility

`dagster-turbine >= 0.5.12` requires `turbine-data >= 0.5.12` on the server.
