Metadata-Version: 2.4
Name: trino-spark-adapter
Version: 1.1.3
Summary: Distributed Spark and Trino/Starburst adapter with SQL/JSON DAG execution and Spark writes to Trino, Iceberg and Hive.
Author: Khalil Laghmari
License: Proprietary
Keywords: spark,trino,starburst,iceberg,hive,distributed
Requires-Python: >=3.10
Description-Content-Type: text/markdown
Requires-Dist: pandas>=1.5
Requires-Dist: python-dateutil>=2.8
Requires-Dist: trino>=0.328
Requires-Dist: pyspark>=3.3

# trino-spark-adapter

`trino-spark-adapter` is a Python package for building hybrid Spark and Trino/Starburst data pipelines.

It provides:

- distributed Trino/Starburst reads executed from Spark executors;
- Spark writes to Trino through distributed `INSERT` batches;
- Spark writes to Iceberg tables with Iceberg partition transforms and table properties;
- Spark writes to Hive-compatible tables or paths with Hive-style partitioning and bucketing;
- a DAG runner that executes `.sql` and `.json` files in alphanumeric order;
- AES-CBC helpers and Spark SQL UDF registration;
- class-based logging with one logger per component.

## Installation

```bash
pip install trino-spark-adapter==1.1.3
```

## Core concepts

A DAG is a folder containing SQL and JSON files. Files are executed in alphanumeric order. The suffix determines the execution mode.

| Suffix | Action |
| --- | --- |
| `.trino.sql` | Execute SQL statements on Trino/Starburst. |
| `.spark.sql` | Execute SQL statements on Spark. |
| `.trino_to_spark.json` | Load a Trino table or query into a Spark temporary view. |
| `.spark_to_trino.json` | Write a Spark view to Trino through distributed inserts. |
| `.spark_to_iceberg.json` | Write a Spark view to an Iceberg table with Spark Writer V2. |
| `.spark_to_hive.json` | Write a Spark view to a Hive-compatible table or path. |

## Minimal DAG runner

```python
from trino_spark_adapter import DagRunner, SparkAESHelper, TrinoConnectionConfig

spark = SparkAESHelper.get_spark(
    app_name="trino_spark_adapter_job",
    register_aes=True,
    fail_if_missing_aes=False,
)

trino_config = TrinoConnectionConfig.from_env()

runner = DagRunner(
    spark=spark,
    trino_config=trino_config,
    params={"calculation_date": "2026-01-05"},
    reader_defaults={"fetch_size": 100_000, "num_ranges": 20, "num_partitions": 20},
)

results = runner.run_folder("dag")
```

## Trino configuration

`TrinoConnectionConfig.from_env()` reads:

```bash
export TRINO_HOST="starburst.example.com"
export TRINO_USER="user"
export TRINO_PASSWORD="password"
export TRINO_ROLES="role"
export TRINO_VERIFY="false"
export TRINO_HTTP_SCHEME="https"
export TRINO_PORT="443"
```

## Spark and AES helper

`SparkAESHelper` creates a Spark session from environment variables whose names start with `spark.` and optionally registers AES UDFs.

```bash
export spark.app.name="trino_spark_adapter_job"
export spark.sql.shuffle.partitions="200"
export aes_key_str="...base64..."
export aes_iv_str="...base64..."
```

```python
from trino_spark_adapter import SparkAESHelper

spark = SparkAESHelper.get_spark(register_aes=True)

spark.sql("SELECT aes_encrypt('abc') AS encrypted_value").show()
```

Registered functions:

- `aes_encrypt(value)`
- `aes_decrypt(value)`

## Date parameters

`DateUtils` can be used to compute placeholders used in DAG files.

```python
from trino_spark_adapter import DateUtils

weekday_dates = DateUtils.generate_weekday_dates_between_start_stop(
    start_dt="2026-01-01",
    stop_dt="2026-02-01",
    weekday=0,
)

du = DateUtils(weekday_dates[0])
params = du.to_params()
params.update({"calculation_date": du.today_tiret[:10]})
```

Typical generated keys include `today_tiret`, `today_slash`, `last_day_tiret`, `last_week_tiret`, `last_month_tiret`, `last_quarter_tiret`, `last_semester_tiret`, and `last_year_tiret`.

## `.trino_to_spark.json`

Load a complete table:

```json
{
  "table_fullname": "catalog.schema.source_table",
  "target_view": "source_table"
}
```

Load a partitioned date range with distributed Trino queries:

```json
{
  "table_fullname": "catalog.schema.source_table",
  "target_view": "source_view",
  "colname": "event_date",
  "coltype": "DATE",
  "format": "%Y-%m-%d",
  "rounding": "D",
  "colname_start_value": "{start_date}",
  "colname_stop_value": "{end_date}",
  "num_ranges": 20
}
```

The runner creates or replaces the Spark temporary view named by `target_view`. When no view name is provided, it derives the view name from the file name. For example, `3.source.trino_to_spark.json` creates `source`.

## `.spark_to_trino.json`

Write a Spark view to a Trino table. The target table is created automatically when it does not exist, based on the Spark schema.

```json
{
  "source_view": "prepared_view",
  "target_table": "catalog.schema.target_table",
  "repartition_by": ["entity_id"],
  "num_partitions": 40,
  "sort_by": ["entity_id"]
}
```

## `.spark_to_iceberg.json`

Write a Spark view to an Iceberg table through the configured Spark Iceberg catalog.

```json
{
  "source_view": "prepared_view",
  "catalog": "iceberg_catalog",
  "schema": "analytics",
  "table": "target_table",
  "mode": "append",
  "partition_spec": [
    {"transform": "day", "column": "event_ts"},
    {"transform": "bucket", "column": "entity_id", "num_buckets": 32}
  ],
  "distribution_mode": "hash",
  "format_version": 2,
  "file_format": "PARQUET",
  "repartition_by": ["entity_id"],
  "num_partitions": 200
}
```

Common modes are `create`, `replace`, `append`, and `overwrite_partitions`.

## `.spark_to_hive.json`

Write a Spark view to a Hive table:

```json
{
  "source_view": "prepared_view",
  "table": "analytics.target_table",
  "mode": "overwrite",
  "format": "parquet",
  "partition_by": ["event_date"],
  "repartition_by": ["event_date"],
  "num_partitions": 40
}
```

Write a Spark view to a path such as S3A:

```json
{
  "source_view": "prepared_view",
  "path": "s3a://bucket/path/target_table",
  "mode": "overwrite",
  "format": "parquet",
  "partition_by": ["event_date"]
}
```

Hive bucketing is supported only with `table` / `saveAsTable`:

```json
{
  "source_view": "prepared_view",
  "table": "analytics.bucketed_table",
  "mode": "overwrite",
  "format": "parquet",
  "bucket_by": ["entity_id"],
  "num_buckets": 32,
  "sort_by": ["entity_id"]
}
```

## Logging

Every main component inherits from `LogBase` and exposes a class logger.

```python
import logging
from trino_spark_adapter import DagRunner, DistributedTrinoSparkReader, SparkHiveWriter

DagRunner.logger().setLevel(logging.INFO)
DistributedTrinoSparkReader.logger().setLevel(logging.DEBUG)
SparkHiveWriter.logger().setLevel(logging.INFO)
```

The default formatter includes timestamp, class name and level:

```text
[2026-01-05 09:15:12] [DagRunner] [INFO] START task type=trino_to_spark file=1.source.trino_to_spark.json
[2026-01-05 09:17:03] [DagRunner] [INFO] SUCCESS task type=trino_to_spark file=1.source.trino_to_spark.json elapsed=0:01:51
```

Debug-only expensive Spark actions should be guarded explicitly:

```python
logger = DagRunner.logger()
if logger.isEnabledFor(logging.DEBUG):
    logger.debug("row_count=%s", df.count())
```

## Publishing

The source archive includes `.pypi.sh`.

```bash
chmod +x .pypi.sh
./.pypi.sh
```


## Resuming or partially executing a DAG

`DagRunner.run_folder` can execute only part of a DAG folder. This is useful
when a long run fails after several successful files and you want to restart
from the failing task.

```python
runner.run_folder(
    "dag",
    start_from_file="6.transform.spark.sql",
)
```

You can also start from a zero-based index:

```python
runner.run_folder(
    "dag",
    start_from_index=5,
)
```

For short validation runs, stop after a specific file or index:

```python
runner.run_folder(
    "dag",
    stop_after_file="6.transform.spark.sql",
)
```

To run only a chosen subset, provide explicit file names. The files are still
executed in the DAG folder's alphanumeric order:

```python
runner.run_folder(
    "dag",
    files=[
        "6.transform.spark.sql",
        "7.export.spark_to_iceberg.json",
    ],
)
```
