Metadata-Version: 2.4
Name: snowflake-data-quality-gate-operator
Version: 0.1.0
Summary: Apache Airflow operator that loads data into Snowflake with built-in data quality checks. Supports S3, GCS, and Azure.
Author-email: Dhairya Sarin <dhairya.1.sarin@gmail.com>
License: Apache-2.0
Project-URL: Homepage, https://github.com/Dhairya-Sarin/airflow-provider-s3-snowflake-quality
Project-URL: Repository, https://github.com/Dhairya-Sarin/airflow-provider-s3-snowflake-quality
Project-URL: Issues, https://github.com/Dhairya-Sarin/airflow-provider-s3-snowflake-quality/issues
Keywords: airflow,provider,snowflake,data-quality,etl,s3,gcs,azure
Classifier: Development Status :: 4 - Beta
Classifier: Environment :: Console
Classifier: Framework :: Apache Airflow
Classifier: Framework :: Apache Airflow :: Provider
Classifier: Intended Audience :: Developers
Classifier: License :: OSI Approved :: Apache Software License
Classifier: Programming Language :: Python :: 3
Classifier: Programming Language :: Python :: 3.9
Classifier: Programming Language :: Python :: 3.10
Classifier: Programming Language :: Python :: 3.11
Classifier: Programming Language :: Python :: 3.12
Classifier: Topic :: Database
Requires-Python: >=3.9
Description-Content-Type: text/markdown
License-File: LICENSE
Requires-Dist: apache-airflow>=2.7.0
Requires-Dist: apache-airflow-providers-snowflake>=5.0.0
Provides-Extra: dev
Requires-Dist: pytest>=7.0; extra == "dev"
Requires-Dist: pytest-cov>=4.0; extra == "dev"
Requires-Dist: pytest-mock>=3.10; extra == "dev"
Requires-Dist: ruff>=0.3.0; extra == "dev"
Requires-Dist: mypy>=1.8; extra == "dev"
Dynamic: license-file

# SnowflakeDataQualityGateOperator

An Apache Airflow provider that loads data from **any cloud storage** (S3, GCS, Azure) into **Snowflake** with a built-in **data quality gate**. Bad data is caught in a transient staging table *before* it ever touches your production tables.

## Why?

Most cloud-to-Snowflake pipelines blindly `COPY INTO` production. When bad data lands, you find out hours later from a broken dashboard. This operator enforces a **stage → validate → promote** pattern in a single, reusable task.

## Installation

```bash
pip install airflow-provider-s3-snowflake-quality
```

## Quick start

```python
from airflow_s3_snowflake_quality.operators import SnowflakeDataQualityGateOperator
from airflow_s3_snowflake_quality.checks import (
    RowCountCheck, NullCheck, UniquenessCheck,
    FreshnessCheck, AcceptedValuesCheck, CustomSQLCheck,
)

load_orders = SnowflakeDataQualityGateOperator(
    task_id="load_orders",
    source_path="s3://my-data-lake/orders/dt={{ ds }}/",
    file_format="PARQUET",
    snowflake_conn_id="snowflake_prod",
    target_database="ANALYTICS",
    target_schema="PUBLIC",
    target_table="ORDERS",
    load_strategy="merge",
    merge_keys=["ORDER_ID"],
    storage_integration="s3_integration",
    quality_checks=[
        RowCountCheck(min=1000),
        NullCheck(column="ORDER_ID", max_fraction=0.0),
        UniquenessCheck(columns=["ORDER_ID"]),
        FreshnessCheck(column="ORDER_TS", max_age_hours=24),
        AcceptedValuesCheck(column="STATUS", values=["pending", "shipped", "delivered", "cancelled"]),
        CustomSQLCheck(sql="SELECT * FROM {staging_table} WHERE TOTAL_AMOUNT < 0"),
    ],
)
```

Works with **GCS** and **Azure** too — just change the `source_path`:

```python
source_path="gcs://bucket/orders/dt={{ ds }}/"       # Google Cloud Storage
source_path="azure://container/orders/dt={{ ds }}/"   # Azure Blob
```

## How it works

```
┌──────────┐     ┌────────────────────┐     ┌──────────┐     ┌────────────┐
│  Cloud   │────▶│  Staging Table     │────▶│ Quality  │────▶│ Production │
│  Storage │     │  (transient)       │     │ Checks   │     │ Table      │
└──────────┘     └────────────────────┘     └──────────┘     └────────────┘
  S3 / GCS                                       │ FAIL
  / Azure                                        ▼
                                            Task fails or
                                            returns report
```

1. **Stage** — `COPY INTO` a transient Snowflake table from cloud storage
2. **Validate** — Run every configured quality check against the staged data
3. **Promote** — If all checks pass, move data via `append`, `replace`, or `merge`
4. **Cleanup** — Staging table is always dropped, even on failure

## Operator parameters

| Parameter | Type | Description |
|---|---|---|
| `source_path` | `str` | Cloud storage path (`s3://`, `gcs://`, `azure://`). Jinja-templatable |
| `file_format` | `str` | `PARQUET`, `CSV`, `JSON`, `AVRO`, `ORC` |
| `snowflake_conn_id` | `str` | Airflow connection ID for Snowflake |
| `target_database` | `str` | Snowflake database |
| `target_schema` | `str` | Snowflake schema |
| `target_table` | `str` | Snowflake table |
| `load_strategy` | `str` | `"append"` (default), `"replace"`, or `"merge"` |
| `merge_keys` | `list[str]` | Required for `merge` strategy |
| `quality_checks` | `list[BaseCheck]` | Quality checks to run |
| `on_failure` | `str` | `"fail"` (default) or `"warn"` |
| `storage_integration` | `str` | Snowflake storage integration name |
| `copy_options` | `str` | Extra `COPY INTO` options |

## Quality checks

| Check | What it does | Example |
|---|---|---|
| `RowCountCheck(min, max)` | Row count within bounds | `RowCountCheck(min=1000)` |
| `NullCheck(column, max_fraction)` | Null rate below threshold | `NullCheck(column="ID", max_fraction=0.0)` |
| `UniquenessCheck(columns)` | Column(s) are unique | `UniquenessCheck(columns=["ID"])` |
| `FreshnessCheck(column, max_age_hours)` | Newest timestamp is recent | `FreshnessCheck(column="TS", max_age_hours=24)` |
| `AcceptedValuesCheck(column, values)` | All values in whitelist | `AcceptedValuesCheck(column="STATUS", values=[...])` |
| `CustomSQLCheck(sql, name)` | SQL returns 0 rows to pass | `CustomSQLCheck(sql="SELECT * FROM {staging_table} WHERE ...")` |

### Writing custom checks

```python
from airflow_s3_snowflake_quality.checks.base import BaseCheck, CheckResult, CheckStatus

class MyCheck(BaseCheck):
    def validate(self, cursor, staging_table) -> CheckResult:
        count = self._execute_scalar(cursor, f"SELECT COUNT(*) FROM {staging_table} WHERE ...")
        if count > 0:
            return CheckResult(check_name=self.name, status=CheckStatus.FAILED, message=f"Found {count} bad rows")
        return CheckResult(check_name=self.name, status=CheckStatus.PASSED, message="All good")
```

## Testing

```bash
python3 -m venv venv && source venv/bin/activate
pip install -e ".[dev]" && pip install duckdb pyarrow

make all              # lint + typecheck + 62 pytest tests (97% coverage)
make test-local-e2e   # 5 end-to-end scenarios with DuckDB
```

See `integration_test/` for full S3 + Snowflake + Airflow Docker-based testing.

## License

Apache 2.0
