Metadata-Version: 2.4
Name: dataplatform-airflow-plugin
Version: 0.1.1
Summary: Apache Airflow plugin for orchestrating Spark Jobs on Data Platform.
Author-email: gada121982 <gada121982@gmail.com>
License: Apache-2.0
Keywords: airflow,airflow-plugin,data-platform,dataplatform,spark,vng-cloud
Classifier: Development Status :: 4 - Beta
Classifier: Framework :: Apache Airflow
Classifier: Intended Audience :: Developers
Classifier: License :: OSI Approved :: Apache Software License
Classifier: Operating System :: OS Independent
Classifier: Programming Language :: Python :: 3
Classifier: Programming Language :: Python :: 3.10
Classifier: Programming Language :: Python :: 3.11
Classifier: Programming Language :: Python :: 3.12
Classifier: Topic :: Software Development :: Libraries :: Python Modules
Requires-Python: >=3.10
Requires-Dist: apache-airflow>=2.8.0
Requires-Dist: requests>=2.28.0
Provides-Extra: dev
Requires-Dist: black>=24.0.0; extra == 'dev'
Requires-Dist: isort>=5.12.0; extra == 'dev'
Requires-Dist: mypy>=1.8.0; extra == 'dev'
Requires-Dist: pytest-mock>=3.12.0; extra == 'dev'
Requires-Dist: pytest>=8.0.0; extra == 'dev'
Requires-Dist: ruff>=0.4.0; extra == 'dev'
Requires-Dist: types-requests; extra == 'dev'
Description-Content-Type: text/markdown

# DataPlatform Airflow Plugin

An Apache Airflow plugin for orchestrating **Spark Jobs on Data Platform** (VNG Cloud).

The plugin exposes a single `DataPlatformOperator` that manages the full lifecycle of a Spark Job:

1. Acquire an IAM access token via OAuth2 client credentials (with automatic refresh).
2. Submit a Spark Job run to the Data Platform API.
3. Poll the job state until it reaches a terminal state.
4. Cancel the active run when the Airflow task is killed.

## Requirements

- Python 3.10 or newer
- Apache Airflow 2.8 or newer (Airflow 3 supported)
- A Data Platform workspace with a registered Spark Job
- VNG Cloud IAM client credentials (client ID and client secret)

## Installation

### From PyPI

```bash
pip install dataplatform-airflow-plugin
```

### From source

```bash
git clone https://git.vngcloud.tech/dataplatform/dataplatform-airflow-plugin.git
cd dataplatform-airflow-plugin
pip install -e ".[dev]"
```

### Verify the installation

```bash
pip show dataplatform-airflow-plugin
python -c "from dataplatform_airflow_plugin import DataPlatformOperator, VNGCloudHook; print('OK')"
```

Confirm the plugin is registered with Airflow:

```bash
airflow plugins
# Expected output:
# dataplatform | dataplatform-airflow-plugin==<version>: EntryPoint(name='dataplatform', value='dataplatform_airflow_plugin.plugin:DataPlatformPlugin', group='airflow.plugins')
```

### Install on the Airflow Helm chart (development)

In `values.yaml` or an override file:

```yaml
env:
  - name: _PIP_ADDITIONAL_REQUIREMENTS
    value: "dataplatform-airflow-plugin==0.1.1"
```

```bash
helm upgrade <release> . -n <namespace> -f override.yaml
```

Helm performs a rolling restart of the scheduler, workers, triggerer, and api-server pods. Each pod runs `pip install` on startup and pulls the plugin from PyPI.

### Install on the Airflow Helm chart (production)

Bake the plugin into a custom image — see [Kubernetes (production)](#kubernetes-production).

## Configuration

The plugin supports three credential sources, resolved in the following priority order:

> **Airflow Connection** → **Airflow Variables** → **Environment variables**

Pick the option that best fits your environment.

### Option 1 — Airflow Connection (recommended for production)

Credentials are Fernet-encrypted at rest, managed through the Airflow UI, and easy to scope per environment (`vng_dev`, `vng_prod`, ...).

**UI: Admin → Connections → Add**

| Field           | Value                                                                |
| --------------- | -------------------------------------------------------------------- |
| Connection Id   | `vng_cloud_default` _(default — overridable via `vng_conn_id`)_      |
| Connection Type | `Generic`                                                            |
| Host            | `https://dev-iam-proxy.dataplatform.vngcloud.tech`                   |
| Login           | `<VNG_CLIENT_ID>`                                                    |
| Password        | `<VNG_CLIENT_SECRET>`                                                |
| Extra (JSON)    | _(optional — see below)_                                             |

**Extra** (optional JSON to override default endpoints):

```json
{
  "data_platform_url": "https://dev-backend-proxy.dataplatform.vngcloud.tech",
  "token_path": "/accounts-api/v2/auth/token",
  "fe_url": "https://dev-app.dataplatform.vngcloud.tech"
}
```

**Provisioning via a Kubernetes Secret:**

```bash
kubectl -n airflow create secret generic vng-cloud-conn \
  --from-literal=AIRFLOW_CONN_VNG_CLOUD_DEFAULT='{"conn_type":"generic","host":"https://dev-iam-proxy.dataplatform.vngcloud.tech","login":"<CID>","password":"<CSECRET>","extra":{"data_platform_url":"https://dev-backend-proxy.dataplatform.vngcloud.tech"}}'
```

```yaml
# override.yaml
env:
  - name: AIRFLOW_CONN_VNG_CLOUD_DEFAULT
    valueFrom:
      secretKeyRef:
        name: vng-cloud-conn
        key: AIRFLOW_CONN_VNG_CLOUD_DEFAULT
```

### Option 2 — Airflow Variables

Quicker to set up, but **not encrypted by default**. Suitable for development or small teams.

**UI: Admin → Variables → Add**

| Variable name           | Required | Default                                                 |
| ----------------------- | -------- | ------------------------------------------------------- |
| `vng_client_id`         | Yes      | —                                                       |
| `vng_client_secret`     | Yes      | —                                                       |
| `vng_iam_host`          | No       | `https://dev-iam-proxy.dataplatform.vngcloud.tech`      |
| `vng_token_path`        | No       | `/accounts-api/v2/auth/token`                           |
| `vng_data_platform_url` | No       | `https://dev-backend-proxy.dataplatform.vngcloud.tech/` |
| `vng_fe_url`            | No       | `https://dev-app.dataplatform.vngcloud.tech`            |

**CLI:**

```bash
airflow variables set vng_client_id "<CLIENT_ID>"
airflow variables set vng_client_secret "<CLIENT_SECRET>"
```

### Option 3 — Environment variables (local development)

```bash
export VNG_CLIENT_ID="..."
export VNG_CLIENT_SECRET="..."
```

Only the two core credentials are read from the environment. To override endpoint URLs, use Option 1 or 2.

## Usage

```python
from datetime import datetime
from airflow import DAG
from dataplatform_airflow_plugin import DataPlatformOperator

with DAG(
    dag_id="example_dataplatform_spark_job",
    start_date=datetime(2026, 1, 1),
    schedule=None,
    catchup=False,
    tags=["dataplatform"],
) as dag:

    run_job = DataPlatformOperator(
        task_id="run_spark_job",
        workspace_id="ws-abc-123",
        job_id="job-xyz-456",
        application_args=["--date", "{{ ds }}", "--mode", "prod"],
        polling_period_seconds=15,
        do_xcom_push=True,
    )
```

See [`dags/example_dataplatform.py`](./dags/example_dataplatform.py) for a complete example with templating and Variables, or [`dags/example_dataplatform_minimal.py`](./dags/example_dataplatform_minimal.py) for the minimal form.

### `DataPlatformOperator` parameters

| Parameter                | Required | Default                   | Description                                                |
| ------------------------ | -------- | ------------------------- | ---------------------------------------------------------- |
| `workspace_id`           | Yes      | —                         | Workspace identifier (templated).                          |
| `job_id`                 | Yes      | —                         | Spark Job identifier (templated).                          |
| `application_args`       | No       | `[""]`                    | `list[str]`, `dict`, or JSON string (templated).           |
| `vng_conn_id`            | No       | `vng_cloud_default`       | Airflow Connection identifier.                             |
| `token_url`              | No       | From Connection / default | Overrides the IAM token endpoint.                          |
| `data_platform_url`      | No       | From Connection / default | Overrides the Data Platform base URL.                      |
| `polling_period_seconds` | No       | `15`                      | Interval between job-state polls, in seconds.              |
| `do_xcom_push`           | No       | `False`                   | Push `workspace_id`, `job_id`, and `run_id` to XCom.       |

**Templated fields:** `workspace_id`, `job_id`, `application_args` — Jinja expressions such as `{{ ds }}`, `{{ params.x }}`, and `{{ var.value.* }}` are supported.

**Template extensions:** `.json` files referenced through `application_args` are auto-loaded.

**Job UI link:** when the job reaches a terminal state, the operator logs a link to the Data Platform UI:

```
[INFO] View logs on Data Platform UI: https://dev-app.dataplatform.vngcloud.tech/workspaces/<ws>/jobs/<job>
```

Override the UI base URL via the Connection's `fe_url` extra or the `vng_fe_url` Variable when promoting to production.

### XCom keys

| Key            | Constant                | Description           |
| -------------- | ----------------------- | --------------------- |
| `workspace_id` | `XCOM_WORKSPACE_ID_KEY` | Workspace identifier. |
| `job_id`       | `XCOM_JOB_ID_KEY`       | Spark Job identifier. |
| `run_id`       | `XCOM_RUN_ID_KEY`       | Run identifier.       |

### Spark Job states

```python
from dataplatform_airflow_plugin import SparkJobState

SparkJobState.SUCCESS.is_final        # True
SparkJobState.SUCCESS.is_successful   # True
SparkJobState.RUNNING.is_final        # False
```

| State        | Final | Successful |
| ------------ | ----- | ---------- |
| `QUEUING`    | No    | —          |
| `SCHEDULING` | No    | —          |
| `PENDING`    | No    | —          |
| `RUNNING`    | No    | —          |
| `SUCCESS`    | Yes   | Yes        |
| `FAILED`     | Yes   | No         |
| `CANCELLED`  | Yes   | No         |

## Kubernetes (production)

For stable releases, **avoid `_PIP_ADDITIONAL_REQUIREMENTS`** — it slows pod startup and depends on network availability at every restart. Build a custom image instead:

```dockerfile
FROM apache/airflow:3.2.0
RUN pip install --no-cache-dir dataplatform-airflow-plugin==0.1.1
```

```bash
docker build -t <registry>/airflow-dataplatform:0.1.1 .
docker push <registry>/airflow-dataplatform:0.1.1
```

```yaml
# override.yaml
defaultAirflowRepository: <registry>/airflow-dataplatform
defaultAirflowTag: "0.1.1"

images:
  airflow:
    repository: <registry>/airflow-dataplatform
    tag: "0.1.1"
    pullPolicy: IfNotPresent
  pod_template:
    repository: <registry>/airflow-dataplatform
    tag: "0.1.1"
    pullPolicy: IfNotPresent
```

> **KubernetesExecutor:** the `pod_template` image must also include the plugin — task pods are spawned from this template.

## Development

```bash
git clone https://git.vngcloud.tech/dataplatform/dataplatform-airflow-plugin.git
cd dataplatform-airflow-plugin
make install          # uv pip install -e ".[dev]"
make test             # pytest
make lint             # ruff + mypy
make format           # black + isort
make build            # build the wheel
```

### Project layout

```
dataplatform-airflow-plugin/
├── dataplatform_airflow_plugin/
│   ├── __init__.py                       # Package metadata and public exports
│   ├── plugin.py                         # AirflowPlugin registration (hooks + operators)
│   ├── dataplatform_operator.py          # DataPlatformOperator + SparkJobState
│   └── hook.py                           # VNGCloudHook (IAM, Data Platform API)
├── dags/
│   ├── example_dataplatform.py           # Full example with templating and Variables
│   └── example_dataplatform_minimal.py   # Minimal example
├── tests/
│   ├── test_state.py
│   └── test_operator.py
├── Makefile
├── pyproject.toml
└── README.md
```

### Release workflow

```bash
# 1. Bump the version in pyproject.toml and dataplatform_airflow_plugin/__init__.py
# 2. Build and publish
make build
uv publish --token "$PYPI_TOKEN"

# 3. Commit, tag, and push
git commit -am "Release v0.1.1"
git tag v0.1.1
git push origin main && git push origin v0.1.1
```

## Troubleshooting

### `ModuleNotFoundError: dataplatform_airflow_plugin`

1. Confirm the package is installed:
   ```bash
   pip show dataplatform-airflow-plugin
   ```
2. On Kubernetes, check that the pod successfully ran `pip install` at startup:
   ```bash
   kubectl -n airflow logs deploy/airflow-scheduler -c scheduler | grep -iE "pip|dataplatform"
   ```
3. When using `_PIP_ADDITIONAL_REQUIREMENTS`, pip may cache an older build if the version in `pyproject.toml` is unchanged. Bump the version (e.g. `0.1.1` → `0.1.2`) or switch to a custom image — see [Kubernetes (production)](#kubernetes-production).

### Task pods (KubernetesExecutor) do not see the plugin

- Ensure `images.pod_template` uses the same image as the scheduler and workers.
- Alternatively, set `_PIP_ADDITIONAL_REQUIREMENTS` at the top-level `env:` so it propagates to task pods.

### Token request fails (HTTP 401/403)

- Confirm credentials are configured through the expected mechanism — see [Configuration](#configuration).
- Verify the `client_id` and `client_secret` in the VNG Cloud IAM console.
- Make sure the `client_id`, `client_secret`, and `iam_host` all belong to the same environment (dev or prod).

### `VNG Cloud credentials are not configured`

The plugin could not locate credentials through any of the three mechanisms. Check each in priority order:

```bash
airflow connections get vng_cloud_default     # Option 1
airflow variables get vng_client_id           # Option 2
echo $VNG_CLIENT_ID                            # Option 3
```

### DAG parse error: "Don't use runtime-varying value as argument in DAG constructor"

- Avoid calls such as `pendulum.today()`, `datetime.now()`, or `Variable.get(...)` directly inside `DAG(...)` or `Operator(...)` arguments.
- Use static values (`pendulum.datetime(2026, 1, 1, tz="UTC")`) or Jinja templates (`"{{ ds }}"`, `"{{ var.value.x }}"`).

## License

Apache License 2.0
