Metadata-Version: 2.4
Name: airflow-provider-yandex-realty
Version: 0.1.0
Summary: Apache Airflow provider for Yandex Realty Partner API — collect call statistics
Author-email: Michael Kozhin <michael@kozhin.cc>
License: MIT
Project-URL: Homepage, https://github.com/mkozhin/airflow-provider-yandex-realty
Project-URL: Documentation, https://github.com/mkozhin/airflow-provider-yandex-realty#readme
Project-URL: Repository, https://github.com/mkozhin/airflow-provider-yandex-realty
Project-URL: Changelog, https://github.com/mkozhin/airflow-provider-yandex-realty/blob/main/CHANGELOG.md
Project-URL: Issues, https://github.com/mkozhin/airflow-provider-yandex-realty/issues
Keywords: airflow,yandex,realty,provider,calls
Classifier: Framework :: Apache Airflow
Classifier: Framework :: Apache Airflow :: Provider
Classifier: Development Status :: 4 - Beta
Classifier: Topic :: Software Development :: Libraries :: Python Modules
Classifier: Topic :: Internet :: WWW/HTTP
Classifier: License :: OSI Approved :: MIT License
Classifier: Programming Language :: Python :: 3
Classifier: Programming Language :: Python :: 3.10
Classifier: Programming Language :: Python :: 3.11
Classifier: Programming Language :: Python :: 3.12
Requires-Python: >=3.10
Description-Content-Type: text/markdown
License-File: LICENSE
Requires-Dist: apache-airflow<3.0,>=2.9.1
Requires-Dist: requests>=2.28
Provides-Extra: dev
Requires-Dist: pytest>=7.0; extra == "dev"
Dynamic: license-file

# airflow-provider-yandex-realty

Apache Airflow provider for the Yandex Realty Partner API — collect call statistics from
Яндекс Недвижимость.

---

*Powered by [Claude Code](https://claude.ai/code)*

---

## Installation

```bash
pip install airflow-provider-yandex-realty
```

Requires Python 3.10+ and `apache-airflow>=2.9.1`.

## Connection

Create an Airflow connection of type **HTTP** (`conn_id = yandex_realty_default` by default).

- **Password** — the OAuth token, stored **without** the `OAuth ` prefix (the code adds it when
  building the `Authorization: OAuth {token}` header).
- **Extra** — partner key, agency id, and account(s).

### Multiple accounts

```json
{
  "x_authorization": "Vertis public-partner-...",
  "agency_id": "417938",
  "accounts": [
    {"client_id": "103575674"},
    {"client_id": "67890"}
  ]
}
```

### Single account

```json
{
  "x_authorization": "Vertis public-partner-...",
  "agency_id": "417938",
  "client_id": "103575674"
}
```

Which account a task processes is decided by the operator's `account_id`
parameter — it is a **selector into the connection**, not a credential:

- **single-account** — omit `account_id` (leave it `None`); the `client_id` is
  read from the connection's top-level `client_id`.
- **multi-account** — you do **not** hardcode it. The DAG enumerates accounts
  from the connection with `list_accounts()` and passes each `account_id`
  automatically (it equals the sanitized `client_id`).

## Usage

`YandexRealtyCallsOperator` collects calls for a date range, groups them by day,
and writes one file per day. **One task processes one account.**

### Single account

Point the operator at the connection — nothing else is needed; the account is
read from the connection's top-level `client_id`.

```python
from airflow_provider_yandex_realty.operators.calls import YandexRealtyCallsOperator

collect = YandexRealtyCallsOperator(
    task_id="collect_calls",
    conn_id="yandex_realty_default",
    date_from="{{ ds }}",
    date_to="{{ ds }}",
    base_dir="/tmp/yandex_realty",
    output_format="json",      # "json" (NDJSON) or "csv"
    add_snapshot_ts=True,      # inject the DAG-run start timestamp (JSON only)
)
```

### Multiple accounts

Don't hardcode `account_id`. Read the accounts from the connection with
`list_accounts()` and fan out one task per account — `account_id` comes from the
connection, not by hand:

```python
from airflow_provider_yandex_realty.accounts import list_accounts
from airflow_provider_yandex_realty.operators.calls import YandexRealtyCallsOperator

for account in list_accounts("yandex_realty_default"):
    YandexRealtyCallsOperator(
        task_id=f"collect_{account.id}",
        conn_id="yandex_realty_default",
        account_id=account.id,     # selector into extra.accounts, taken from the connection
        date_from="{{ ds }}",
        date_to="{{ ds }}",
    )
```

`date_from`, `date_to` and `conn_id` are templated. `execute` returns a list of
`{date, path, snapshot_ts}` entries (one per day written).

For a full multi-account example (collect → GCS → BigQuery + S3), see
`examples/bq_and_s3_multi_account_dag.py`.

## Output

- **Layout** — `{base_dir}/{account_id}/{safe_run_id}/{date}.{ext}`. The
  `account_id` segment is omitted when `account_id` is `None` (single-account).
  Each path component is sanitized (`[^\w-]` → `_`) so untrusted values cannot
  escape `base_dir`. `ext` is `json` or `csv`.
- **JSON** — newline-delimited JSON (NDJSON): one JSON object per line.
- **CSV** — a header row plus one row per call, all fields quoted.
- **Columns** — the 12 canonical `CALL_FIELDS`: `call_datetime`, `date`,
  `object_name`, `incoming_phone`, `internal_phone`, `wait_duration`,
  `call_duration`, `revenue`, `object_type`, `campaign_tariff`, `client_tariff`,
  `is_targeted`.
- **`snapshot_ts`** — added to each record only when `add_snapshot_ts=True`, and
  **only in JSON output**; the CSV schema never gains a `snapshot_ts` column.

## License

MIT
