Metadata-Version: 2.4
Name: trino-spark-adapter
Version: 1.0.0
Summary: Lecture Trino/Starburst distribuée avec Spark, exécution DAG SQL/JSON et écritures Spark vers Trino, Iceberg ou fichiers.
Author: Khalil Laghmari
License: Proprietary
Keywords: spark,trino,starburst,iceberg,hive,distributed
Requires-Python: >=3.10
Description-Content-Type: text/markdown
Requires-Dist: pandas>=1.5
Requires-Dist: python-dateutil>=2.8
Requires-Dist: trino>=0.328
Requires-Dist: pyspark>=3.3

# trino-spark-adapter

`trino-spark-adapter` est une librairie Python/Spark pour industrialiser des traitements hybrides Spark + Trino/Starburst.

Elle permet de :

- lire des tables ou requêtes Trino en parallèle depuis un cluster Spark ;
- construire automatiquement des splits de lecture sur une colonne de partitionnement ;
- exposer une source Trino comme vue temporaire Spark ;
- exécuter un dossier de DAG contenant des fichiers SQL et JSON dans l'ordre alphanumérique ;
- exécuter du SQL pur côté Trino ou côté Spark ;
- écrire une vue Spark vers une table Trino avec création automatique de la table si elle n'existe pas ;
- écrire une vue Spark vers Iceberg via le catalog Spark ;
- écrire une vue Spark vers un chemin fichier local, HDFS, S3/S3A, ABFS ou GCS selon la configuration Spark ;
- utiliser un helper AES-CBC compatible UDF Spark pour manipuler des données chiffrées.

La version `1.0.0` correspond à une première version packagée stable, volontairement conservatrice : le code modulaire récent est gardé, les anciennes versions monolithiques ne sont pas incluses, et les modifications sont limitées aux points critiques nécessaires pour un usage en package.

---

## Installation

Depuis PyPI :

```bash
pip install trino-spark-adapter
```

Depuis une wheel locale :

```bash
pip install trino_spark_adapter-1.0.0-py3-none-any.whl
```

Depuis le zip source :

```bash
pip install trino_spark_adapter_v1_0_0.zip
```

## Dépendances principales

Le package déclare notamment :

- `pyspark>=3.3`
- `trino>=0.328`
- `pandas>=1.5`
- `python-dateutil>=2.8`

Pour écrire dans S3, Iceberg, Hive ou un object storage, les jars et configurations Spark doivent être fournis dans la `SparkSession` par votre environnement d'exécution.

---

## Configuration Trino

La configuration Trino peut être construite depuis les variables d'environnement :

```bash
export TRINO_HOST="starburst.company.net"
export TRINO_USER="user"
export TRINO_PASSWORD="password"
export TRINO_ROLES="system=role"
export TRINO_VERIFY="false"
export TRINO_HTTP_SCHEME="https"
export TRINO_PORT="443"
```

```python
from trino_spark_adapter import TrinoConnectionConfig

trino_config = TrinoConnectionConfig.from_env()
```

ou passée explicitement :

```python
trino_config = {
    "host": "starburst.company.net",
    "user": "user",
    "password": "password",
    "roles": "system=role",
    "verify": False,
    "http_scheme": "https",
    "port": 443,
}
```

---

## Lecture distribuée Trino -> Spark

### Lecture simple d'une table complète

```python
from trino_spark_adapter import DistributedTrinoSparkReader

reader = DistributedTrinoSparkReader(
    spark=spark,
    query_limit0="SELECT * FROM hive.schema.table LIMIT 0",
    queries=["SELECT * FROM hive.schema.table"],
    host=host,
    user=user,
    password=password,
    roles=roles,
    verify=False,
    fetch_size=1_000_000,
)

df = reader.spark_df
df.createOrReplaceTempView("ma_vue")
```

### Lecture partitionnée par colonne

```python
from trino_spark_adapter import DistributedTrinoSparkReaderBuilder

reader = DistributedTrinoSparkReaderBuilder(
    spark=spark,
    table_fullname="hive.analytics.events",
    colname="event_date",
    coltype="DATE",
    colname_start_value="2026-01-01",
    colname_stop_value="2026-01-31",
    num_ranges=20,
    format="%Y-%m-%d",
    rounding="D",
    host=host,
    user=user,
    password=password,
    roles=roles,
    verify=False,
)

df = reader.spark_df
df.createOrReplaceTempView("events")
```

La librairie génère plusieurs requêtes Trino et les exécute via Spark afin que chaque partition Spark lise une partie de la source.

---

## Écriture Spark -> Trino avec création automatique

`DistributedTrinoSparkWriter` écrit un `DataFrame` Spark vers une table Trino en générant des `INSERT INTO ... VALUES (...)` par partition Spark.

La table est créée automatiquement avant l'insertion avec un `CREATE TABLE IF NOT EXISTS` construit à partir du schéma Spark. L'utilisateur n'a pas besoin de mettre `create_table: true`.

```python
from trino_spark_adapter import DistributedTrinoSparkWriter

writer = DistributedTrinoSparkWriter(
    spark=spark,
    target_table="hive.analytics.result_table",
    host=host,
    user=user,
    password=password,
    roles=roles,
    verify=False,
    columns=["id", "event_date", "amount"],
    repartition_by=["event_date"],
    num_partitions=40,
    initial_batch_rows=1000,
    max_sql_chars=1_000_000,
)

summary = writer.write(df)
```

Options utiles :

- `columns` : sous-ensemble et ordre des colonnes à écrire ;
- `table_properties` : propriétés de table Trino/Hive/Iceberg si supportées par le connecteur ;
- `create_table_sql` : DDL explicite optionnel, seulement si vous voulez remplacer le DDL généré ;
- `repartition_by`, `num_partitions`, `sort_by`, `coalesce_to` : optimisation Spark avant écriture ;
- `initial_batch_rows`, `max_sql_chars`, `max_retries_per_batch` : contrôle des batches SQL.

---

## Écriture Spark -> Iceberg via Spark Catalog

Pour les tables Iceberg, le chemin recommandé est souvent d'écrire directement avec Spark via un catalog Iceberg déjà configuré.

```python
from trino_spark_adapter import DistributedSparkToTrinoWriter

writer = DistributedSparkToTrinoWriter(
    spark=spark,
    catalog="iceberg_prod",
    schema="analytics",
    table="kpi_events",
    mode="create",
    partition_spec=[
        {"transform": "day", "column": "event_ts"},
        {"transform": "bucket", "column": "id", "num_buckets": 32},
    ],
    repartition_by=["id"],
    num_partitions=200,
    distribution_mode="hash",
    format_version=2,
    file_format="PARQUET",
)

writer.write(df)
```

Modes supportés :

- `create`
- `replace`
- `append`
- `overwrite_partitions`

---

## Exécution d'un dossier DAG

Le `DagRunner` exécute les fichiers d'un dossier dans l'ordre alphanumérique.

```python
from trino_spark_adapter import DagRunner, TrinoConnectionConfig

params = {
    "current_dt": "2026-05-31",
    "last_semester_dt": "2025-11-30",
}

runner = DagRunner(
    spark=spark,
    trino_config=TrinoConnectionConfig.from_env(),
    params=params,
    reader_defaults={
        "fetch_size": 1_000_000,
        "num_partitions": 20,
        "num_ranges": 20,
    },
)

results = runner.run_folder("dag")
```

Exemple de dossier :

```text
 dag/
 ├── 0.trino.sql
 ├── 1.detaillants_points_vente.trino_to_spark.json
 ├── 2.individus_detaillants.trino_to_spark.json
 ├── 3.paiements_encrypted.trino_to_spark.json
 ├── 4.paiements_enrichis.spark.sql
 ├── 5.resultats.spark_to_trino.json
 └── 6.export_kpis.spark_to_file.json
```

Extensions supportées :

| Extension | Action |
|---|---|
| `.trino.sql` | Exécute chaque requête SQL sur Trino/Starburst |
| `.spark.sql` | Exécute chaque requête SQL sur Spark |
| `.trino_to_spark.json` | Lit une source Trino et crée une vue temporaire Spark |
| `.spark_to_trino.json` | Écrit une vue Spark vers une table Trino ou Iceberg |
| `.spark_to_file.json` | Écrit une vue Spark vers un chemin fichier |

Les fichiers SQL peuvent contenir plusieurs requêtes séparées par `;`.

Les fichiers SQL et JSON supportent `str.format(**params)`, par exemple `{current_dt}` ou `{last_semester_dt}`.

---

## `.trino_to_spark.json`

### Table complète

```json
{
  "table_fullname": "lab_dsec_iceberg.lab_mnp_dsec_fraud.t_detaillants_points_vente_1"
}
```

Avec un fichier nommé :

```text
1.detaillants_points_vente.trino_to_spark.json
```

la vue Spark créée automatiquement sera :

```text
detaillants_points_vente
```

### Table filtrée/splittée par colonne de date

```json
{
  "table_fullname": "lab_dsec_hive.lab_mnp_dsec_fraud.t_b_sugar_paiementbeneficiaire_1",
  "colname": "partition_reception_date",
  "coltype": "DATE",
  "format": "%Y-%m-%d",
  "rounding": "D",
  "colname_start_value": "{last_semester_dt}",
  "colname_stop_value": "{current_dt}",
  "num_ranges": 20
}
```

### Requête Trino comme source

```json
{
  "target_view": "paiements_filtrees",
  "query_base": "SELECT id, partition_reception_date, amount FROM hive.schema.payments WHERE country = 'FR'",
  "colname": "partition_reception_date",
  "coltype": "DATE",
  "format": "%Y-%m-%d",
  "rounding": "D",
  "colname_start_value": "2026-01-01",
  "colname_stop_value": "2026-01-31",
  "num_ranges": 20
}
```

---

## `.spark_to_trino.json`

Ce fichier écrit une vue temporaire Spark vers Trino. La création de la table cible est automatique.

```json
{
  "source_view": "paiements_enrichis",
  "target_table": "lab_dsec_hive.lab_mnp_dsec_fraud.t_paiements_enrichis_result",
  "columns": ["id", "event_date", "amount"],
  "repartition_by": ["event_date"],
  "num_partitions": 40,
  "initial_batch_rows": 1000,
  "max_sql_chars": 1000000,
  "fail_fast": true
}
```

Le runner fait automatiquement :

1. `df = spark.table(source_view)` ;
2. génération du `CREATE TABLE IF NOT EXISTS` depuis le schéma Spark ;
3. exécution du DDL sur Trino ;
4. inserts distribués par partitions Spark.

### Mode Iceberg via Spark Catalog

Si vous voulez écrire via Spark/Iceberg plutôt que par `INSERT` Trino :

```json
{
  "writer": "iceberg",
  "source_view": "paiements_enrichis",
  "catalog": "iceberg_prod",
  "schema": "analytics",
  "table": "paiements_enrichis",
  "mode": "append",
  "partition_spec": [
    {"transform": "day", "column": "event_ts"},
    {"transform": "bucket", "column": "id", "num_buckets": 32}
  ],
  "repartition_by": ["id"],
  "num_partitions": 200,
  "distribution_mode": "hash",
  "format_version": 2,
  "file_format": "PARQUET"
}
```

---

## `.spark_to_file.json`

Ce fichier écrit une vue temporaire Spark vers un chemin cible. Le format par défaut est `parquet`.

Spark utilise sa propre configuration pour accéder au stockage. Si la session Spark contient déjà les secrets S3/S3A ou les credentials cloud, il suffit donc de fournir le chemin cible.

### Export parquet simple

```json
{
  "source_view": "beneficiaire_kpis",
  "path": "s3a://my-bucket/fraud/beneficiaire_kpis"
}
```

### Export partitionné avec overwrite

```json
{
  "source_view": "beneficiaire_kpis",
  "path": "s3a://my-bucket/fraud/beneficiaire_kpis",
  "format": "parquet",
  "mode": "overwrite",
  "partition_by": ["event_date"],
  "repartition_by": ["event_date"],
  "num_partitions": 40,
  "options": {
    "compression": "snappy"
  }
}
```

Champs supportés :

- `source_view` : vue Spark à écrire ;
- `path`, `target_path` ou `output_path` : chemin cible ;
- `format` : `parquet` par défaut, mais aussi `csv`, `json`, `orc`, etc. selon Spark ;
- `mode` : mode Spark, par défaut `errorifexists`; exemples : `overwrite`, `append`, `ignore` ;
- `partition_by` : colonnes de partitionnement fichier ;
- `options` : options du writer Spark ;
- `repartition_by`, `num_partitions`, `sort_by`, `coalesce_to` : optimisation avant écriture.

---

## Chiffrement AES

Le package expose `PyAES`, utilisable directement ou dans une UDF Spark.

```python
from trino_spark_adapter import PyAES

crypto = PyAES(key_str=base64_key, iv_str=base64_iv)
enc = crypto.encrypt("hello")
plain = crypto.decrypt(enc)
```

Exemple UDF :

```python
from pyspark.sql import functions as F
from pyspark.sql.types import StringType
from trino_spark_adapter import PyAES

crypto = PyAES(key_str=base64_key, iv_str=base64_iv)
spark.udf.register("aes_decrypt", lambda x: crypto.decrypt(x), StringType())

spark.sql("SELECT aes_decrypt(encrypted_col) FROM my_view")
```

---

## Recommandations de production

- Pour les gros volumes Spark -> table, privilégier Iceberg via Spark Catalog quand c'est possible.
- Pour Spark -> Trino par `INSERT VALUES`, ajuster `num_partitions`, `initial_batch_rows` et `max_sql_chars`.
- Garder les secrets dans la configuration Spark, les variables d'environnement ou un secret manager, pas dans les fichiers DAG.
- Définir explicitement `target_view` dans les JSON si le nom de vue ne doit pas être déduit du nom du fichier.
- Versionner le dossier `dag/` avec des préfixes numériques pour garantir l'ordre d'exécution.

---

## API principale

```python
from trino_spark_adapter import (
    DagRunner,
    TrinoConnectionConfig,
    DistributedTrinoSparkReader,
    DistributedTrinoSparkReaderBuilder,
    DistributedTrinoSparkWriter,
    DistributedSparkToTrinoWriter,
    SparkFileWriter,
    PyAES,
)
```

---

## Statut

`1.0.0` est la première version packagée destinée à être publiée sur PyPI.
