Metadata-Version: 2.4
Name: sparkless-testing
Version: 0.2.0
Summary: Easy testing with sparkless or PySpark on demand
Author-email: Odos Matthews <odosmatthews@gmail.com>
Maintainer-email: Odos Matthews <odosmatthews@gmail.com>
License: MIT
Project-URL: Homepage, https://github.com/eddiethedean/sparkless-testing
Project-URL: Repository, https://github.com/eddiethedean/sparkless-testing
Project-URL: Issues, https://github.com/eddiethedean/sparkless-testing/issues
Keywords: spark,pyspark,testing,pytest,sparkless,data-engineering,unit-testing,fixtures
Classifier: Development Status :: 3 - Alpha
Classifier: Intended Audience :: Developers
Classifier: License :: OSI Approved :: MIT License
Classifier: Operating System :: OS Independent
Classifier: Programming Language :: Python :: 3
Classifier: Programming Language :: Python :: 3.9
Classifier: Programming Language :: Python :: 3.10
Classifier: Programming Language :: Python :: 3.11
Classifier: Topic :: Software Development :: Testing
Classifier: Topic :: Software Development :: Libraries :: Python Modules
Requires-Python: >=3.9
Description-Content-Type: text/markdown
License-File: LICENSE
Requires-Dist: pytest>=7.0.0
Provides-Extra: mock
Requires-Dist: sparkless>=3.19.0; extra == "mock"
Provides-Extra: pyspark
Requires-Dist: pyspark<3.6.0,>=3.5.0; extra == "pyspark"
Provides-Extra: dev
Requires-Dist: pytest>=7.0.0; extra == "dev"
Requires-Dist: pytest-cov>=4.0.0; extra == "dev"
Requires-Dist: mypy>=1.0.0; extra == "dev"
Requires-Dist: ruff>=0.4.0; extra == "dev"
Requires-Dist: sparkless>=3.19.0; extra == "dev"
Requires-Dist: pyspark<3.6.0,>=3.5.0; extra == "dev"
Dynamic: license-file

# sparkless-testing

> **Easy testing with sparkless or PySpark on demand**

A Python package that simplifies running tests with either **sparkless** (mock) or **PySpark** (real) engines on demand. Write tests once, run them with either engine seamlessly.

## Why sparkless-testing?

- 🚀 **Fast Mock Testing**: Use `sparkless` for lightning-fast unit tests without JVM overhead
- 🔄 **Real Engine Validation**: Test against real PySpark to catch integration issues
- 🎯 **Single Test Suite**: Write tests once, run with both engines automatically
- 🔧 **Zero Configuration**: Automatic engine detection and session management
- ⚡ **Parallel Ready**: Optimized for parallel test execution with pytest-xdist

## Features

- **Automatic Engine Detection** - Automatically detects and configures available engines
- **Pytest Fixtures** - Ready-to-use fixtures for both engines
- **Session Management** - Automatic session creation, cleanup, and isolation
- **Test Utilities** - Helpers for common test patterns
- **Parametrization Support** - Run tests with both engines automatically
- **Parallel Testing Support** - Optimized for parallel test execution with pytest-xdist

## Installation

```bash
pip install sparkless-testing
```

For specific engines:

```bash
# Install with mock engine (sparkless)
pip install sparkless-testing[mock]

# Install with PySpark engine
pip install sparkless-testing[pyspark]

# Install with all engines
pip install sparkless-testing[dev]
```

## Quick Start

### Basic Usage

```python
import pytest
from sparkless_testing import pytest_fixtures

# Use the spark_session fixture
def test_my_function(spark_session):
    df = spark_session.createDataFrame([{"id": 1, "name": "Alice"}])
    assert df.count() == 1
    assert df.collect()[0]["name"] == "Alice"
```

### Explicit Engine Selection

```python
def test_mock_only(mock_spark_session):
    # Only runs with sparkless
    df = mock_spark_session.createDataFrame([{"id": 1}])
    assert df.count() == 1

def test_pyspark_only(pyspark_session):
    # Only runs with PySpark
    df = pyspark_session.createDataFrame([{"id": 1}])
    assert df.count() == 1
```

### Using Functions and Types

```python
def test_with_functions(spark_session, spark_functions, spark_types):
    F = spark_functions
    Types = spark_types
    
    df = spark_session.createDataFrame([{"name": "Alice"}])
    result = df.select(F.upper(F.col("name"))).collect()
    assert result[0][0] == "ALICE"
```

**Note**: Always use `spark_functions` together with `spark_session`. PySpark functions like `F.col()` require an active SparkContext, which is provided by the `spark_session` fixture.

### Parametrized Tests (Both Engines)

You can run tests with both engines in two ways:

**Option 1: Using the decorator**
```python
@pytest.mark.parametrize_engines
def test_both_engines(spark_session):
    # Runs with both engines automatically
    df = spark_session.createDataFrame([{"id": 1}])
    assert df.count() == 1
```

**Option 2: Using the `both_engines` fixture**
```python
def test_both_engines(both_engines):
    spark, engine_type = both_engines
    # Runs with both engines automatically
    df = spark.createDataFrame([{"id": 1}])
    assert df.count() == 1
    # engine_type will be EngineType.MOCK or EngineType.PYSPARK
```

### Environment-Based Configuration

```bash
# Run tests with mock engine
SPARK_MODE=mock pytest tests/

# Run tests with PySpark engine
SPARK_MODE=pyspark pytest tests/

# Auto-detect (default)
SPARK_MODE=auto pytest tests/
```

### Pytest Markers

```python
@pytest.mark.spark_engine("mock")
def test_mock_specific(spark_session):
    # Forces mock engine
    pass

@pytest.mark.spark_engine("pyspark")
def test_pyspark_specific(spark_session):
    # Forces PySpark engine
    pass
```

## Configuration

### Environment Variables

- `SPARK_MODE`: `mock`, `pyspark`, or `auto` (default: `auto`)
- `SPARK_TEST_WAREHOUSE_DIR`: Custom warehouse directory for PySpark tests
- `SPARK_TEST_APP_NAME`: Custom app name prefix (default: `sparkless-testing`)

### Programmatic Configuration

```python
from sparkless_testing import (
    EngineType,
    SessionConfig,
    auto_configure_engine,
    create_session,
    create_pyspark_session,
)

# Auto-configure based on availability
auto_configure_engine()

# Or specify preferred engine
auto_configure_engine(EngineType.MOCK)

# Create session manually
spark = create_session(engine_type=EngineType.MOCK)

# Create session with custom configuration
config = SessionConfig(
    app_name="my-test",
    shuffle_partitions=2,
)
spark = create_pyspark_session(config=config)
# ... use spark ...
spark.stop()
```

## API Reference

### Engine Configuration

- `EngineType`: Enum for engine types (`MOCK`, `PYSPARK`, `AUTO`)
- `configure_engine()`: Manually configure engine components
- `get_engine()`: Get current engine configuration
- `detect_available_engines()`: Check which engines are installed
- `auto_configure_engine()`: Automatically configure based on availability

### Session Factory

- `create_mock_session()`: Create sparkless session
- `create_pyspark_session()`: Create PySpark session
- `create_session()`: Factory function that creates appropriate session
- `SessionConfig`: Configuration dataclass for session creation
  - `app_name`: Application name for the Spark session
  - `warehouse_dir`: Custom warehouse directory (optional)
  - `enable_ui`: Enable Spark UI (default: `False`)
  - `shuffle_partitions`: Number of shuffle partitions (default: `1`)
  - `parallelism`: Default parallelism (default: `1`)
  - `adaptive_enabled`: Enable adaptive query execution (default: `False`)

### Test Utilities

- `detect_spark_type(spark)`: Detect if session is PySpark or mock
- `create_test_dataframe(spark, data, schema)`: Compatibility wrapper for DataFrame creation
- `is_dataframe_like(obj)`: Check if object is DataFrame-like

### Pytest Fixtures

- `spark_session`: Main fixture (auto-detects engine based on `SPARK_MODE` or markers)
- `mock_spark_session`: Explicitly use sparkless (mock) engine
- `pyspark_session`: Explicitly use PySpark (real) engine
- `both_engines`: Fixture that yields `(spark, engine_type)` for both engines (parametrized)
- `spark_functions`: Functions module (F) for current engine
- `spark_types`: Types module for current engine
- `spark_engine_type`: Current engine type as string (`"mock"` or `"pyspark"`)

## Advanced Usage

### Custom Session Configuration

```python
from sparkless_testing import SessionConfig, create_pyspark_session

def test_with_custom_config():
    config = SessionConfig(
        app_name="custom-test",
        warehouse_dir="/tmp/my-warehouse",
        enable_ui=True,
        shuffle_partitions=4,
        parallelism=4,
    )
    spark = create_pyspark_session(config=config)
    # ... your test code ...
    spark.stop()
```

### Parallel Test Execution

```bash
# Run tests in parallel with pytest-xdist
pytest -n 2  # Use 2 workers
pytest -n auto  # Auto-detect worker count
```

**Best Practices for Parallel Testing:**
- Mock (sparkless) tests can run with high parallelism (`-n 10+`)
- PySpark tests work best with fewer workers (`-n 2` or `-n 4`)
- Each test gets a unique session name and warehouse directory

### Using Test Utilities

```python
from sparkless_testing.utils import create_test_dataframe, detect_spark_type

def test_with_utilities(spark_session):
    # Create DataFrame with schema handling
    data = [("Alice", 25), ("Bob", 30)]
    schema = ["name", "age"]
    df = create_test_dataframe(spark_session, data, schema)
    
    # Detect engine type
    engine = detect_spark_type(spark_session)
    assert engine in ("mock", "pyspark")
```

## Examples

### Complete Test Example

```python
import pytest
from sparkless_testing.pytest_fixtures import spark_session, spark_functions

def test_data_transformation(spark_session, spark_functions):
    F = spark_functions
    
    # Create test data
    data = [
        {"id": 1, "name": "Alice", "age": 25},
        {"id": 2, "name": "Bob", "age": 30},
    ]
    df = spark_session.createDataFrame(data)
    
    # Transform data
    result = (
        df.filter(F.col("age") > 25)
        .select("name", "age")
        .collect()
    )
    
    # Assertions
    assert len(result) == 1
    assert result[0]["name"] == "Bob"
```

### Using with Both Engines

```python
@pytest.mark.parametrize_engines
def test_compatibility(spark_session, spark_engine_type):
    # This test runs with both engines
    df = spark_session.createDataFrame([{"value": 42}])
    
    # Engine-specific logic if needed
    if spark_engine_type == "mock":
        # Mock-specific assertions
        pass
    else:
        # PySpark-specific assertions
        pass
    
    assert df.count() == 1
```

## Troubleshooting

### Tests Hang with Parallel Execution

If tests hang when using `pytest -n`, try:
- Use fewer workers: `pytest -n 2` instead of `pytest -n 10`
- Run PySpark tests sequentially: `pytest -n 0` for PySpark-only tests
- Use mock engine for faster parallel execution: `SPARK_MODE=mock pytest -n 10`


### SparkContext Errors

Always use `spark_functions` together with `spark_session`:
```python
def test_correct(spark_session, spark_functions):  # ✅ Correct
    F = spark_functions
    df = spark_session.createDataFrame([{"x": 1}])
    result = df.select(F.col("x")).collect()

def test_incorrect(spark_functions):  # ❌ May fail with PySpark
    F = spark_functions
    # Missing active SparkContext
```

## Migration Guide

### From Manual Engine Switching

**Before:**
```python
import os
from sparkless import SparkSession as MockSparkSession
from pyspark.sql import SparkSession as PySparkSession

def test_my_function():
    if os.environ.get("SPARK_MODE") == "mock":
        spark = MockSparkSession("test")
    else:
        spark = PySparkSession.builder.appName("test").getOrCreate()
    # ... test code ...
    spark.stop()
```

**After:**
```python
from sparkless_testing.pytest_fixtures import spark_session

def test_my_function(spark_session):
    # ... test code ...
    # spark_session is automatically cleaned up
```

## Requirements

- Python 3.9+
- pytest 7.0+

Optional:
- sparkless>=3.19.0 (for mock engine)
- pyspark>=3.5.0 (for real engine)
- pytest-xdist (for parallel test execution)

## License

MIT License - see [LICENSE](LICENSE) file for details.

## Links

- **GitHub**: [github.com/eddiethedean/sparkless-testing](https://github.com/eddiethedean/sparkless-testing)
- **Issues**: [github.com/eddiethedean/sparkless-testing/issues](https://github.com/eddiethedean/sparkless-testing/issues)

---

**Made with ❤️ for the data engineering community**

