Metadata-Version: 2.4
Name: dataplatform-airflow-plugin
Version: 0.1.0
Summary: Apache Airflow plugin for orchestrating Spark Jobs on Data Platform.
Author-email: gada121982 <gada121982@gmail.com>
License: Apache-2.0
Keywords: airflow,airflow-plugin,data-platform,dataplatform,spark,vng-cloud
Classifier: Development Status :: 4 - Beta
Classifier: Framework :: Apache Airflow
Classifier: Intended Audience :: Developers
Classifier: License :: OSI Approved :: Apache Software License
Classifier: Operating System :: OS Independent
Classifier: Programming Language :: Python :: 3
Classifier: Programming Language :: Python :: 3.10
Classifier: Programming Language :: Python :: 3.11
Classifier: Programming Language :: Python :: 3.12
Classifier: Topic :: Software Development :: Libraries :: Python Modules
Requires-Python: >=3.10
Requires-Dist: apache-airflow>=2.8.0
Requires-Dist: requests>=2.28.0
Provides-Extra: dev
Requires-Dist: black>=24.0.0; extra == 'dev'
Requires-Dist: isort>=5.12.0; extra == 'dev'
Requires-Dist: mypy>=1.8.0; extra == 'dev'
Requires-Dist: pytest-mock>=3.12.0; extra == 'dev'
Requires-Dist: pytest>=8.0.0; extra == 'dev'
Requires-Dist: ruff>=0.4.0; extra == 'dev'
Requires-Dist: types-requests; extra == 'dev'
Description-Content-Type: text/markdown

# DataPlatform Airflow Plugin

Apache Airflow plugin để trigger và quản lý **Spark Job trên GreenNode Data Platform (VNG Cloud)**.

Plugin cung cấp `DataPlatformOperator` xử lý toàn bộ vòng đời job:

1. Lấy IAM Access Token (OAuth2 client credentials, auto-refresh)
2. Submit Spark Job run
3. Poll status đến khi job kết thúc
4. Cancel job nếu Airflow task bị kill

---

## Installation

**Từ PyPI (production):**

```bash
pip install dataplatform-airflow-plugin
```

**Từ source (local dev):**

```bash
git clone https://git.vngcloud.tech/dataplatform/dataplatform-airflow-plugin.git
cd dataplatform-airflow-plugin
pip install -e ".[dev]"
```

Verify đã cài thành công:

```bash
pip show dataplatform-airflow-plugin
python -c "from dataplatform_airflow_plugin import DataPlatformOperator, VNGCloudHook; print('OK')"
```

Hoặc check qua Airflow CLI:

```bash
airflow plugins
# Phải thấy dòng:
# dataplatform | dataplatform-airflow-plugin==0.1.0: EntryPoint(name='dataplatform', value='dataplatform_airflow_plugin.plugin:DataPlatformPlugin', group='airflow.plugins')
```

### Cài trên Airflow Helm chart (dev)

Trong `values.yaml` / `override.yaml`:

```yaml
env:
  - name: _PIP_ADDITIONAL_REQUIREMENTS
    value: "dataplatform-airflow-plugin==0.1.0"
```

```bash
helm upgrade <release> . -n <namespace> -f override.yaml
```

Helm sẽ rolling restart pods (scheduler/worker/triggerer/api-server) → mỗi pod khi start sẽ `pip install` plugin từ PyPI.

### Cài trên Airflow Helm chart (production)

Build custom image — xem [Kubernetes (Production)](#kubernetes-production) ở dưới.

---

## Configuration

Plugin hỗ trợ **3 cách** setup credentials, ưu tiên theo thứ tự sau:

> **Connection** → **Variable** → **Env var**

Plugin tìm credentials ở Connection trước, nếu không có thì sang Variable, cuối cùng là env. Chọn **1 trong 3** tuỳ nhu cầu.

---

### Cách 1 — Airflow Connection ⭐ (recommend cho production)

Password được mã hóa Fernet, có UI riêng, multi-environment dễ dàng (`vng_dev`, `vng_prod`...).

**UI: Admin → Connections → Add**

| Field           | Giá trị                                                        |
| --------------- | -------------------------------------------------------------- |
| Connection Id   | `vng_cloud_default` _(default — có thể đổi qua `vng_conn_id`)_ |
| Connection Type | `Generic`                                                      |
| Host            | `https://dev-iam-proxy.dataplatform.vngcloud.tech`             |
| Login           | `<VNG_CLIENT_ID>`                                              |
| Password        | `<VNG_CLIENT_SECRET>`                                          |
| Extra (JSON)    | _(optional — xem dưới)_                                        |

**Extra** (JSON, optional — override default URL):

```json
{
  "data_platform_url": "https://dev-backend-proxy.dataplatform.vngcloud.tech",
  "token_path": "/accounts-api/v2/auth/token",
  "fe_url": "https://dev-app.dataplatform.vngcloud.tech"
}
```

**Đặt qua Kubernetes Secret (Helm):**

```bash
kubectl -n airflow create secret generic vng-cloud-conn \
  --from-literal=AIRFLOW_CONN_VNG_CLOUD_DEFAULT='{"conn_type":"generic","host":"https://dev-iam-proxy.dataplatform.vngcloud.tech","login":"<CID>","password":"<CSECRET>","extra":{"data_platform_url":"https://dev-backend-proxy.dataplatform.vngcloud.tech"}}'
```

```yaml
# override.yaml
env:
  - name: AIRFLOW_CONN_VNG_CLOUD_DEFAULT
    valueFrom:
      secretKeyRef:
        name: vng-cloud-conn
        key: AIRFLOW_CONN_VNG_CLOUD_DEFAULT
```

---

### Cách 2 — Airflow Variables (đơn giản, giống iomete)

Setup nhanh không cần hiểu Connection, nhưng **không encrypt mặc định**. Phù hợp dev hoặc team nhỏ.

**UI: Admin → Variables → Add**

| Variable name             | Required | Default                                                       |
| ------------------------- | -------- | ------------------------------------------------------------- |
| `vng_client_id`           | ✅       | —                                                             |
| `vng_client_secret`       | ✅       | —                                                             |
| `vng_iam_host`            | ❌       | `https://dev-iam-proxy.dataplatform.vngcloud.tech`            |
| `vng_token_path`          | ❌       | `/accounts-api/v2/auth/token`                                 |
| `vng_data_platform_url`   | ❌       | `https://dev-backend-proxy.dataplatform.vngcloud.tech/`       |
| `vng_fe_url`              | ❌       | `https://dev-app.dataplatform.vngcloud.tech`                  |

**Set qua CLI:**

```bash
airflow variables set vng_client_id "<CLIENT_ID>"
airflow variables set vng_client_secret "<CLIENT_SECRET>"
```

---

### Cách 3 — Env var (local dev, không khuyến khích production)

```bash
export VNG_CLIENT_ID="..."
export VNG_CLIENT_SECRET="..."
```

Chỉ override 2 credentials cốt lõi. Để override URL endpoints, dùng Connection hoặc Variable.

---

## Usage

```python
from datetime import datetime
from airflow import DAG
from dataplatform_airflow_plugin import DataPlatformOperator

with DAG(
    dag_id="example_dataplatform_spark_job",
    start_date=datetime(2026, 1, 1),
    schedule=None,
    catchup=False,
    tags=["dataplatform"],
) as dag:

    run_job = DataPlatformOperator(
        task_id="run_spark_job",
        workspace_id="ws-abc-123",
        job_id="job-xyz-456",
        application_args=["--date", "{{ ds }}", "--mode", "prod"],
        polling_period_seconds=15,
        do_xcom_push=True,
    )
```

Xem thêm [`dags/example_dataplatform.py`](./dags/example_dataplatform.py) (full) hoặc [`dags/example_dataplatform_minimal.py`](./dags/example_dataplatform_minimal.py) (minimal).

### `DataPlatformOperator` parameters

| Parameter                | Required | Default                 | Description                                       |
| ------------------------ | -------- | ----------------------- | ------------------------------------------------- |
| `workspace_id`           | ✅       | —                       | Workspace ID (templated)                          |
| `job_id`                 | ✅       | —                       | Spark Job ID (templated)                          |
| `application_args`       | ❌       | `[""]`                  | `list[str]`, `dict`, hoặc JSON string (templated) |
| `vng_conn_id`            | ❌       | `vng_cloud_default`     | Airflow Connection ID                             |
| `token_url`              | ❌       | từ Connection / default | Override IAM token endpoint                       |
| `data_platform_url`      | ❌       | từ Connection / default | Override Data Platform base URL                   |
| `polling_period_seconds` | ❌       | `15`                    | Thời gian giữa các lần poll status                |
| `do_xcom_push`           | ❌       | `False`                 | Push `workspace_id`, `job_id`, `run_id` qua XCom  |

**Templated fields**: `workspace_id`, `job_id`, `application_args` — hỗ trợ Jinja `{{ ds }}`, `{{ params.x }}`, `{{ var.value.* }}`.

**Template extension**: `.json` — file `.json` sẽ được auto-load.

**Logs URL**: khi job kết thúc (success/failed), operator sẽ log link tới UI Data Platform để xem logs:

```
[INFO] View logs on Data Platform UI: https://dev-app.dataplatform.vngcloud.tech/workspaces/<ws>/jobs/<job>
```

Override URL base qua Connection extra `fe_url` hoặc Variable `vng_fe_url` khi chuyển sang prod.

### XCom keys

| Key            | Constant                | Mô tả        |
| -------------- | ----------------------- | ------------ |
| `workspace_id` | `XCOM_WORKSPACE_ID_KEY` | Workspace ID |
| `job_id`       | `XCOM_JOB_ID_KEY`       | Spark Job ID |
| `run_id`       | `XCOM_RUN_ID_KEY`       | Run ID       |

### Spark Job states

```python
from dataplatform_airflow_plugin import SparkJobState

SparkJobState.SUCCESS.is_final        # True
SparkJobState.SUCCESS.is_successful   # True
SparkJobState.RUNNING.is_final        # False
```

| State        | Final? | Success? |
| ------------ | ------ | -------- |
| `QUEUING`    | ❌     | —        |
| `SCHEDULING` | ❌     | —        |
| `PENDING`    | ❌     | —        |
| `RUNNING`    | ❌     | —        |
| `SUCCESS`    | ✅     | ✅       |
| `FAILED`     | ✅     | ❌       |
| `CANCELLED`  | ✅     | ❌       |

---

## Kubernetes (Production)

Khi release ổn định, **không nên** dùng `_PIP_ADDITIONAL_REQUIREMENTS` (chậm pod start, phụ thuộc network mỗi lần khởi động). Build custom image:

```dockerfile
FROM apache/airflow:3.2.0
RUN pip install --no-cache-dir dataplatform-airflow-plugin==0.1.0
```

```bash
docker build -t <registry>/airflow-dataplatform:0.1.0 .
docker push <registry>/airflow-dataplatform:0.1.0
```

```yaml
# override.yaml
defaultAirflowRepository: <registry>/airflow-dataplatform
defaultAirflowTag: "0.1.0"

images:
  airflow:
    repository: <registry>/airflow-dataplatform
    tag: "0.1.0"
    pullPolicy: IfNotPresent
  pod_template:
    repository: <registry>/airflow-dataplatform
    tag: "0.1.0"
    pullPolicy: IfNotPresent
```

> **KubernetesExecutor**: image `pod_template` cũng phải chứa plugin (task pod sinh từ template này).

---

## Development

```bash
git clone https://git.vngcloud.tech/dataplatform/dataplatform-airflow-plugin.git
cd dataplatform-airflow-plugin
make install          # uv pip install -e ".[dev]"
make test             # pytest
make lint             # ruff + mypy
make format           # black + isort
make build            # build wheel
```

### Project structure

```
dataplatform-airflow-plugin/
├── dataplatform_airflow_plugin/
│   ├── __init__.py                       # Package metadata, public exports
│   ├── plugin.py                         # AirflowPlugin registration (hooks + operators)
│   ├── dataplatform_operator.py          # DataPlatformOperator + SparkJobState
│   └── hook.py                           # VNGCloudHook (IAM, Data Platform API)
├── dags/
│   ├── example_dataplatform.py           # Full example (templating, Variables)
│   └── example_dataplatform_minimal.py   # Minimal example
├── tests/
│   ├── test_state.py
│   └── test_operator.py
├── Makefile
├── pyproject.toml
└── README.md
```

### Release workflow

```bash
# 1. Bump version trong pyproject.toml (ví dụ 0.1.0 → 0.1.1)
# 2. Build + publish
make build
uv publish --token "$PYPI_TOKEN"

# 3. Commit + tag + push
git commit -am "Release v0.1.1"
git tag v0.1.1
git push origin main && git push origin v0.1.1
```

---

## Troubleshooting

**`ModuleNotFoundError: dataplatform_airflow_plugin`**

1. Verify package đã cài:
   ```bash
   pip show dataplatform-airflow-plugin
   ```
2. Trên Kubernetes, check pod có install được không (lúc start mới chạy `pip install`):
   ```bash
   kubectl -n airflow logs deploy/airflow-scheduler -c scheduler | grep -iE "pip|dataplatform"
   ```
3. Nếu dùng `_PIP_ADDITIONAL_REQUIREMENTS` mà version trong `pyproject.toml` không bump, pip có thể cache → restart pod không reinstall. Bump version (vd `0.1.0` → `0.1.1`) hoặc đổi sang Docker image-based (xem [Kubernetes (Production)](#kubernetes-production)).

**Task pod (KubernetesExecutor) không có plugin**

- Đảm bảo `images.pod_template` cùng image với scheduler/worker.
- Hoặc set env `_PIP_ADDITIONAL_REQUIREMENTS` ở top-level `env:` để propagate xuống task pod.

**Token request fail (401/403)**

- Kiểm tra credentials đúng theo cách bạn setup (Connection / Variable / env). Xem priority order ở [Configuration](#configuration).
- Verify `client_id` / `client_secret` trong VNG Cloud IAM console.
- Xác minh cả `client_id` lẫn `client_secret` thuộc cùng environment (dev / prod) và cùng `iam_host`.

**`VNG Cloud credentials are not configured`**

Plugin không tìm được credentials qua bất kỳ cách nào trong 3. Check theo thứ tự ưu tiên:

```bash
airflow connections get vng_cloud_default     # Cách 1
airflow variables get vng_client_id           # Cách 2
echo $VNG_CLIENT_ID                            # Cách 3
```

**Lỗi parse DAG: "Don't use runtime-varying value as argument in Dag constructor"**

- Không dùng `pendulum.today()`, `datetime.now()`, `Variable.get(...)` trực tiếp trong `DAG(...)` / `Operator(...)` args.
- Dùng giá trị tĩnh (`pendulum.datetime(2026, 1, 1, tz="UTC")`) hoặc Jinja template (`"{{ ds }}"`, `"{{ var.value.x }}"`).

---

## License

Apache 2.0
