Metadata-Version: 2.4
Name: e6data-spark-compatibility
Version: 1.0.0
Summary: PySpark and Sedona compatibility layer for e6data
Home-page: https://github.com/e6data/e6-spark-compat
Author: E6Data
Author-email: support@e6data.com
Project-URL: Bug Reports, https://github.com/e6data/e6-spark-compat/issues
Project-URL: Source, https://github.com/e6data/e6-spark-compat
Project-URL: Documentation, https://github.com/e6data/e6-spark-compat/blob/main/README.md
Keywords: pyspark spark sql dataframe e6data compatibility sedona spatial
Classifier: Development Status :: 4 - Beta
Classifier: Intended Audience :: Developers
Classifier: Intended Audience :: System Administrators
Classifier: Topic :: Database
Classifier: Topic :: Database :: Database Engines/Servers
Classifier: Topic :: Scientific/Engineering :: GIS
Classifier: Topic :: Software Development :: Libraries :: Python Modules
Classifier: License :: OSI Approved :: Apache Software License
Classifier: Programming Language :: Python :: 3
Classifier: Programming Language :: Python :: 3.7
Classifier: Programming Language :: Python :: 3.8
Classifier: Programming Language :: Python :: 3.9
Classifier: Programming Language :: Python :: 3.10
Classifier: Programming Language :: Python :: 3.11
Classifier: Programming Language :: Python :: 3.12
Classifier: Operating System :: OS Independent
Requires-Python: >=3.7
Description-Content-Type: text/markdown
License-File: LICENSE
Requires-Dist: e6data-python-connector>=2.3.10
Requires-Dist: pandas>=1.0.0
Requires-Dist: numpy>=1.18.0
Provides-Extra: dev
Requires-Dist: pytest>=6.0; extra == "dev"
Requires-Dist: pytest-cov>=2.0; extra == "dev"
Requires-Dist: pytest-html>=3.0; extra == "dev"
Requires-Dist: black>=22.0; extra == "dev"
Requires-Dist: flake8>=3.8; extra == "dev"
Requires-Dist: mypy>=0.910; extra == "dev"
Provides-Extra: spatial
Requires-Dist: geopandas>=0.10.0; extra == "spatial"
Requires-Dist: shapely>=1.8.0; extra == "spatial"
Dynamic: author
Dynamic: author-email
Dynamic: classifier
Dynamic: description
Dynamic: description-content-type
Dynamic: home-page
Dynamic: keywords
Dynamic: license-file
Dynamic: project-url
Dynamic: provides-extra
Dynamic: requires-dist
Dynamic: requires-python
Dynamic: summary

# e6-spark-compat

A PySpark and Sedona compatibility layer for e6data, providing seamless migration from PySpark to e6data with minimal code changes.

## Overview

e6-spark-compat is a production-ready compatibility library that translates PySpark and Apache Sedona operations into optimized e6data SQL queries. The library implements lazy evaluation, professional SQL generation using SQLGlot, and comprehensive PySpark API compatibility.

### Key Features

- **Lazy Evaluation**: Build query plans that execute only when actions are called
- **Professional SQL Generation**: SQLGlot-based AST optimization for high-performance queries
- **Complete PySpark API**: DataFrame operations, SQL functions, window functions, and aggregations
- **Spatial Operations**: Full Apache Sedona compatibility with ST_* functions
- **Window Functions**: Complete Window specification API for advanced analytics
- **Multiple File Formats**: Parquet, ORC, CSV, JSON, and GeoParquet support
- **Dual-Mode Test Suite**: 1270 tests run offline (SQL validation) and live (cluster execution)
- **CI/CD**: GitHub Actions with offline (automatic) and live (manual) test runs

## Installation

### For Users

```bash
# Install from PyPI
pip install e6data-spark-compatibility

# Install from GitHub
pip install git+https://github.com/e6data/e6-spark-compat.git

# Install from local clone
git clone https://github.com/e6data/e6-spark-compat.git
cd e6-spark-compat
pip install -e .
```

### For Developers

```bash
# Clone the repository
git clone https://github.com/e6data/e6-spark-compat.git
cd e6-spark-compat

# Install with development dependencies
pip install -e ".[dev]"

# Run tests
pytest tests/
```

## Quick Start

### Migration from PySpark

Simply change your import statements - everything else stays the same:

```python
# Before: PySpark imports
# from pyspark.sql import SparkSession
# from pyspark.sql.functions import col, upper, count, sum, row_number
# from pyspark.sql.window import Window

# After: e6-spark-compat imports
from e6_spark_compat import SparkSession
from e6_spark_compat.sql.functions import col, upper, count, sum, row_number
from e6_spark_compat.sql.window import Window

# Connection configuration
spark = (SparkSession.builder
    .appName("E6DataExample")
    .config("spark.e6data.host", "your-host")
    .config("spark.e6data.username", "your-username")
    .config("spark.e6data.password", "your-password")
    .config("spark.e6data.database", "your-database")
    .config("spark.e6data.catalog", "your-catalog")
    .config("spark.e6data.cluster", "your-cluster-name")
    .config("spark.e6data.secure", True)
    .getOrCreate())

# All PySpark operations work identically
df = spark.read.parquet("s3://bucket/path/to/data.parquet")

# Transformations (lazy evaluation)
result = (df.filter(col("age") > 21)
    .select("name", upper(col("city")).alias("city_upper"), "salary")
    .groupBy("city_upper")
    .agg(
        count("*").alias("total_people"),
        sum("salary").alias("total_salary")
    )
    .orderBy(col("total_people").desc()))

# Action triggers execution
result.show()
```

### Window Functions

```python
from e6_spark_compat.sql.window import Window
from e6_spark_compat.sql.functions import row_number, rank, lag

# Window specifications
window_spec = Window.partitionBy("department").orderBy("salary")
frame_spec = (Window.partitionBy("team").orderBy("hire_date")
    .rowsBetween(Window.UNBOUNDED_PRECEDING, Window.CURRENT_ROW))

# Window functions
df_with_analytics = df.select(
    "*",
    row_number().over(window_spec).alias("rank"),
    lag("salary", 1).over(window_spec).alias("prev_salary"),
    sum("salary").over(frame_spec).alias("running_total")
)
```

## Spatial Support

For spatial operations with Sedona compatibility:

```python
from e6_spark_compat.sedona import SedonaRegistrator
from e6_spark_compat.sql.functions import expr

# Register Sedona functions
SedonaRegistrator.registerAll(spark)

# Use spatial functions
points_df = spark.read.parquet("s3://bucket/points.parquet")
polygons_df = spark.read.parquet("s3://bucket/polygons.parquet")

# Spatial join
result = points_df.join(
    polygons_df,
    expr("ST_Contains(polygons_df.geometry, ST_Point(points_df.lon, points_df.lat))"),
    "inner"
)
```

## Comprehensive Feature Set

### DataFrame Operations
- **Transformations**: `select()`, `filter()`/`where()`, `join()`, `groupBy()`, `orderBy()`/`sort()`, `limit()`, `distinct()`
- **Set Operations**: `union()`, `intersect()`, `exceptAll()`
- **Column Operations**: `withColumn()`, `withColumnRenamed()`, `drop()`, `cast()`
- **Caching**: `cache()`/`persist()`, `unpersist()`

### Action Methods
- **Data Retrieval**: `collect()`, `count()`, `first()`, `take()`, `head()`
- **Display**: `show()`, `explain()`, `describe()`
- **Export**: `toPandas()`, `write.parquet()`, `write.orc()`, `write.csv()`, `write.json()`
- **Views**: `createOrReplaceTempView()`, `createGlobalTempView()`

### SQL Functions (135+)
- **String Functions**: `upper`, `lower`, `concat`, `substring`, `trim`, `split`, `regexp_replace`, `length`
- **Math Functions**: `abs`, `round`, `floor`, `ceil`, `sqrt`, `pow`, `sin`, `cos`, `log`
- **Aggregate Functions**: `sum`, `avg`, `count`, `min`, `max`, `first`, `last`, `collect_list`
- **Date/Time Functions**: `year`, `month`, `dayofmonth`, `hour`, `minute`, `date_add`, `date_sub`, `datediff`
- **Window Functions**: `row_number`, `rank`, `dense_rank`, `percent_rank`, `ntile`, `lag`, `lead`
- **Conditional Functions**: `when`, `coalesce`, `isnull`, `isnan`, `greatest`, `least`
- **Hash Functions**: `md5`, `sha1`, `sha2`, `hash`, `xxhash64`
- **Array Functions**: `array`, `array_contains`, `explode`, `size`, `array_sort`, and 16 more
- **Map Functions**: `map_keys`, `map_values`, `map_from_arrays`, `map_concat`
- **JSON Functions**: `get_json_object`, `json_extract`, `from_json`, `to_json`

### Spatial Operations (Sedona Compatible)
- **Geometry Functions**: All ST_* functions including `ST_Point`, `ST_Polygon`, `ST_Buffer`
- **Spatial Relationships**: `ST_Contains`, `ST_Intersects`, `ST_Distance`, `ST_Within`
- **Transformations**: `ST_Transform`, `ST_Centroid`, `ST_ConvexHull`
- **Indexing**: H3 and GeoHash spatial indexing support

## Testing

### Test Suite Overview

The test suite uses **dual-mode execution**: tests run offline by default (mock connection, SQL validation) and can also execute on a live e6data cluster when credentials are provided.

| Category | Tests | Description |
|---|---|---|
| E2E SQL Generation | 360 | Full pipeline: PySpark API → SQL string validation |
| TPC-DS Translations | 37 | Standard benchmark queries (37 of 99 TPC-DS queries) |
| TPC-DS Analytics | 22 | Complex analytics workloads (star schema, RFM, ETL pipelines) |
| SQLGlot Expressions | 288 | SQL expression generation via SQLGlot |
| Integration | 227 | Query plan node composition |
| Unit | 200 | Individual class/method tests |
| Writer | 38 | DataFrameWriter operations |
| Workloads | 94 | Customer workload pipeline tests (Kroger DQ + DFM analytics) |
| **Total** | **1270** | **1263 pass, 7 known failures** |

### Customer Scripts

Standalone scripts that exercise real customer pipelines end-to-end:

| Script | Description | Dataset |
|---|---|---|
| `kroger_original_test.py` | Kroger pre-DQ validation pipeline | mars_full |
| `e6_dfm_on_mars_full.py` | DFM pipeline patterns (39 tests) | mars_full |
| `e6_dfm_test.py` | DFM feature coverage (42 tests) | TPC-DS |
| `*_rust.py` variants | Same scripts with FORMAT_NUMBER workaround for rust engine |

```bash
# Run customer script on Java engine
E6DATA_HOST=... S3_READ_BASE=... python tests/customer_scripts/kroger_original_test.py

# Run on rust engine
E6DATA_HOST=... S3_READ_BASE=... python tests/customer_scripts/kroger_original_test_rust.py
```

### Running Tests

```bash
# Offline (default) — fast, no cluster needed (~5 seconds)
./run_tests.sh

# Specific file or directory
./run_tests.sh offline 1 tests/workloads/
./run_tests.sh offline 1 tests/workloads/test_dfm_pipeline.py
./run_tests.sh offline 1 tests/workloads/test_dfm_pipeline.py::TestDFMMissingFunctions

# By keyword or marker
./run_tests.sh offline 1 -k "dfm"
./run_tests.sh offline 1 -m e2e

# Or use pytest directly
pytest tests/e2e/                    # E2E tests
pytest tests/tpcds/                  # TPC-DS queries
pytest tests/e2e/ -m spatial         # Spatial tests
```

### Live Mode (cluster execution)

Set environment variables to run the same tests against a real e6data cluster:

```bash
export E6DATA_HOST="your-cluster-host"
export E6DATA_PORT="443"
export E6DATA_USERNAME="your-user"
export E6DATA_PASSWORD="your-token"
export E6DATA_DATABASE="tpcds_1000_delta"
export E6DATA_CATALOG="your-catalog"
export E6DATA_CLUSTER="your-cluster"

# Sequential with full HTML report
./run_tests.sh live

# Parallel with N workers
./run_tests.sh live 4
./run_tests.sh live 8
```

In live mode, each test generates SQL (same as offline) **and** executes it on the cluster with `LIMIT 5`. The HTML report captures query IDs, row counts, execution times, and error details.

### Test Reports

Reports are auto-generated on every run:

- **HTML**: `reports/test_report.html` — browsable report with expandable details per test:
  - PySpark input source code
  - Generated SQL
  - Query ID (live mode)
  - Engine error details (live mode, failures only)
- **JUnit XML**: `reports/test_results.xml` — for CI/CD integration

### CI / GitHub Actions

Offline tests run automatically on every push/PR — no setup needed.

Live tests are triggered manually (requires GitHub secrets configured):

**From GitHub UI:**
1. Go to **Actions** tab → **e6-spark-compat CI**
2. Click **Run workflow**
3. Select branch, cluster (`general` / `rust-azure-benchmark`), and worker count
4. Click **Run workflow**

**From CLI:**
```bash
gh workflow run tests.yml --ref main -f run_live=true -f cluster=general -f concurrency=4
```

The CI summary includes category breakdown tables, failure details, and the `dorny/test-reporter` renders an interactive test results tab.

## Architecture

### Core Components

1. **Query Plan Tree**: AST-like nodes representing SQL operations (Filter, Project, Join, Aggregate, etc.)
2. **SQLGlot Integration**: Professional SQL generation using AST-based optimization
3. **Lazy Evaluation**: Operations build query plans without immediate execution
4. **Connection Management**: Efficient e6data connection handling and query execution

### Execution Flow

1. **Build Phase**: DataFrame operations create query plan nodes
2. **Optimization Phase**: SQLGlot optimizes the query plan using AST transformations
3. **Generation Phase**: Query plans generate optimized SQL via SQLGlot
4. **Execution Phase**: Actions trigger SQL execution on e6data
5. **Result Phase**: Results returned in PySpark-compatible format

## Development

### Project Structure

```
e6_spark_compat/
├── core/           # Session, connection, query plan, SQL generator
├── sql/            # DataFrame, Column, functions, window, reader, writer, types
├── spatial/        # Sedona-compatible ST_* functions
└── sedona/         # SedonaRegistrator compatibility shim

tests/
├── e2e/              # E2E SQL generation tests (dual-mode: offline + live)
├── tpcds/            # TPC-DS benchmark query translations (37 queries)
├── integration/      # Query plan composition tests
├── sqlglot_tests/    # SQLGlot expression tests
├── unit/             # Unit tests
├── workloads/        # Customer workload pipeline tests
├── customer_scripts/ # Standalone customer pipeline scripts
└── live/             # Live cluster-only execution tests
```

### Code Quality

```bash
# Format code
black e6_spark_compat/

# Run linting
flake8 e6_spark_compat/

# Type checking
mypy e6_spark_compat/
```

## Migration Guide

### From PySpark

1. **Update Imports**: Change `pyspark` imports to `e6_spark_compat`
2. **Configuration**: Update SparkSession configuration for e6data connection
3. **Test**: Run your existing PySpark code - it should work unchanged

### From Sedona

1. **Spatial Functions**: Replace Sedona imports with `e6_spark_compat.spatial.functions`
2. **Registration**: Use `SedonaRegistrator.registerAll(spark)` for compatibility
3. **Geometry Types**: Spatial operations work identically to Sedona

## Contributing

We welcome contributions! Here's how to get started:

1. Fork the repository
2. Create a feature branch
3. Add tests for new functionality
4. Ensure all tests pass (`pytest tests/`)
5. Submit a pull request

## Support

- **Documentation**: See `docs/pyspark-compatibility/` for detailed guides
- **Issues**: Report bugs and feature requests on GitHub
- **Contact**: support@e6data.com for enterprise support

## License

Apache License 2.0 - see [LICENSE](LICENSE) for details.
