Metadata-Version: 2.4
Name: py-flink-sql-gateway
Version: 0.1.0.dev0
Summary: A Python client for the Apache Flink SQL Gateway REST API
Author-email: Ilya Soin <ilya.soin@exness.com>
Requires-Python: >=3.11
Requires-Dist: httpx>=0.28.1
Description-Content-Type: text/markdown

# py-flink-sql-gateway

A lightweight Python driver for the **Apache Flink SQL Gateway**, implementing [PEP 249 (DB-API 2.0)](https://peps.python.org/pep-0249/).

## Installation

```bash
pip install py-flink-sql-gateway
```

## Quick start

```python
from flink_gateway import connect

with connect("http://localhost:8083") as conn:
    # Create a streaming source
    with conn.cursor() as cur:
        cur.execute("""
            CREATE TABLE orders (
                id INT NOT NULL,
                item STRING NOT NULL,
                created_at TIMESTAMP(6) NOT NULL,
                customer ROW<
                    first_name STRING NOT NULL,
                    last_name STRING NOT NULL,
                    age INT NOT NULL
                > NOT NULL,
                notes STRING
            ) WITH (
                'connector' = 'datagen',
                'rows-per-second' = '5',
                'fields.id.kind' = 'sequence',
                'fields.id.start' = '1',
                'fields.id.end' = '100',
                'fields.item.length' = '12',
                'fields.customer.first_name.length' = '8',
                'fields.customer.last_name.length' = '10',
                'fields.customer.age.min' = '21',
                'fields.customer.age.max' = '65',
                'fields.notes.length' = '12'
            )
        """)

    # Query and iterate
    with conn.cursor() as cur:
        cur.execute("""
            SELECT id, item, created_at, customer, notes AS note
            FROM orders
        """)
        for i, row in enumerate(cur):
            id_, item, created_at, customer, note = row
            print(
                f"{id_}\t{item}\t{created_at.isoformat()}\t"
                f"{customer['first_name']} {customer['last_name']} ({customer['age']})\t"
                f"{note or ''}"
            )
            if i >= 4:
                break
```

> **Tip:** `ROW` and `MAP` types arrive as Python `dict`, `ARRAY` arrives as `list`. Binary data is passed through as-is.

### Low-level REST access

If you need features beyond DB-API, use the exported client directly:

```python
from flink_gateway import FlinkSqlGatewayClient

with FlinkSqlGatewayClient("http://localhost:8083") as client:
    status = client.get_operation_status("session-handle", "operation-handle")
    print("current status:", status)
```

## Type mapping

Python uses `None` for SQL NULLs natively — no wrapper types needed.

| Flink Type                | Python Type        |
|---------------------------|--------------------|
| TINYINT / SMALLINT / INT  | `int`              |
| BIGINT / INTERVAL         | `int`              |
| FLOAT / DOUBLE            | `float`            |
| BOOLEAN                   | `bool`             |
| CHAR / VARCHAR / STRING   | `str`              |
| DECIMAL                   | `decimal.Decimal`  |
| DATE                      | `datetime.date`    |
| TIME                      | `datetime.time`    |
| TIMESTAMP / TIMESTAMP_LTZ | `datetime.datetime`|
| BINARY / VARBINARY        | raw (passthrough)  |
| ROW                       | `dict`             |
| MAP                       | `dict`             |
| ARRAY                     | `list`             |

---

## Development & tests

Requires Python 3.13+ and [uv](https://docs.astral.sh/uv/).

```bash
# Install dependencies
uv sync

# Run unit tests (no Docker needed)
uv run pytest tests/test_client.py tests/test_dbapi.py -v

# Run all tests including integration (requires Docker)
uv run pytest tests/ -v
```

Integration tests spin up a Flink cluster (JobManager + TaskManager + SQL Gateway) via [testcontainers-python](https://testcontainers-python.readthedocs.io/).

---

## License

MIT (see LICENSE)
