Metadata-Version: 2.4
Name: lucid-pipeline
Version: 0.1.1
Summary: A clean, expressive pipeline pattern for Python.
Project-URL: Homepage, https://github.com/sharik709/lucid-pipeline
Project-URL: Documentation, https://github.com/sharik709/lucid-pipeline#readme
Project-URL: Repository, https://github.com/sharik709/lucid-pipeline
Project-URL: Issues, https://github.com/sharik709/lucid-pipeline/issues
Author: Sharik Shaikh
License-Expression: MIT
License-File: LICENSE
Keywords: chain,data-processing,middleware,pipe,pipeline,workflow
Classifier: Development Status :: 4 - Beta
Classifier: Intended Audience :: Developers
Classifier: License :: OSI Approved :: MIT License
Classifier: Programming Language :: Python :: 3
Classifier: Programming Language :: Python :: 3.10
Classifier: Programming Language :: Python :: 3.11
Classifier: Programming Language :: Python :: 3.12
Classifier: Programming Language :: Python :: 3.13
Classifier: Topic :: Software Development :: Libraries :: Python Modules
Classifier: Typing :: Typed
Requires-Python: >=3.10
Provides-Extra: dev
Requires-Dist: mypy>=1.0; extra == 'dev'
Requires-Dist: pytest-asyncio>=0.21; extra == 'dev'
Requires-Dist: pytest>=7.0; extra == 'dev'
Requires-Dist: ruff>=0.1; extra == 'dev'
Description-Content-Type: text/markdown

# Lucid Pipeline

**A clean, expressive pipeline pattern for Python — pass data through a series of steps with elegance.**

Stop writing deeply nested function calls and messy `for` loops for multi-step data processing. Lucid Pipeline gives Python a clean, expressive way to pass data through a series of steps — readable, testable, and beautiful.

[![PyPI version](https://badge.fury.io/py/lucid-pipeline.svg)](https://pypi.org/project/lucid-pipeline/)
[![Python 3.10+](https://img.shields.io/badge/python-3.10%2B-blue.svg)](https://www.python.org/downloads/)
[![License: MIT](https://img.shields.io/badge/License-MIT-green.svg)](LICENSE)

---

## Before & After

**Without Lucid Pipeline:**

```python
data = get_request_data()
data = strip_whitespace(data)
data = validate_fields(data)
result = normalize_emails(data)
if user.is_premium:
    result = apply_premium_features(result)
try:
    return save_to_database(result)
except Exception:
    return handle_error(result)
```

**With Lucid Pipeline:**

```python
from lucid_pipeline import Pipeline

result = (
    Pipeline(get_request_data())
    .through([strip_whitespace, validate_fields, normalize_emails])
    .when(user.is_premium, [apply_premium_features])
    .on_failure(handle_error)
    .then(save_to_database)
)
```

---

## Installation

```bash
pip install lucid-pipeline
```

Requires Python 3.10 or higher. No dependencies.

---

## Quick Start

### Simple function pipeline

```python
from lucid_pipeline import Pipeline

def double(value):
    return value * 2

def add_ten(value):
    return value + 10

def to_string(value):
    return f"Result: {value}"

result = Pipeline(5).through([double, add_ten, to_string]).then_return()
# result = "Result: 20"
```

### With a final destination

```python
result = Pipeline(5).through([double, add_ten]).then(to_string)
# result = "Result: 20"
```

### Class-based pipes

```python
from lucid_pipeline import Pipe

class TrimStrings(Pipe):
    def handle(self, data, next_pipe):
        if isinstance(data, dict):
            data = {k: v.strip() if isinstance(v, str) else v for k, v in data.items()}
        return next_pipe(data)

class ConvertEmptyStringsToNone(Pipe):
    def handle(self, data, next_pipe):
        if isinstance(data, dict):
            data = {k: (None if v == "" else v) for k, v in data.items()}
        return next_pipe(data)

result = (
    Pipeline({"name": "  John  ", "age": 30, "bio": ""})
    .through([TrimStrings(), ConvertEmptyStringsToNone()])
    .then_return()
)
# result = {"name": "John", "age": 30, "bio": None}
```

---

## Full API Reference

### `Pipeline(passable)`

Creates a new pipeline with the data to be passed through.

| Parameter   | Type  | Description                                    |
|-------------|-------|------------------------------------------------|
| `passable`  | `Any` | The data that will flow through the pipeline.  |

```python
pipeline = Pipeline({"email": "JOHN@EXAMPLE.COM"})
```

---

### `.through(pipes)`

Sets the list of pipes the data will travel through. Can be called multiple times — each call **appends** to the existing pipe list (does not replace).

| Parameter | Type                          | Description                         |
|-----------|-------------------------------|-------------------------------------|
| `pipes`   | `list[Callable \| Pipe]`      | List of functions or Pipe instances.|

A pipe can be any of:

1. **A plain function** — receives the passable, returns transformed passable.
2. **A Pipe subclass instance** — must implement `handle(self, data, next_pipe)`.
3. **A lambda** — for inline transforms.

```python
Pipeline(data).through([step_one, step_two]).through([step_three])
# All three steps run in order: step_one → step_two → step_three
```

---

### `.pipe(single_pipe)`

Appends a single pipe. Syntactic sugar for `.through([single_pipe])`.

```python
Pipeline(data).pipe(validate).pipe(transform).pipe(save).then_return()
```

---

### `.when(condition, pipes)`

Conditionally adds pipes. The pipes only run if `condition` is truthy. If `condition` is a callable, it receives the current passable and is evaluated at execution time.

| Parameter   | Type                              | Description                              |
|-------------|-----------------------------------|------------------------------------------|
| `condition` | `bool \| Callable[[Any], bool]`   | Static value or callable returning bool. |
| `pipes`     | `list[Callable \| Pipe]`          | Pipes to add if condition is met.        |

```python
# Static condition (evaluated immediately)
Pipeline(order).when(order.has_coupon, [apply_discount]).then_return()

# Dynamic condition (evaluated at pipeline execution time)
Pipeline(order).when(lambda data: data.total > 100, [apply_bulk_discount]).then_return()
```

---

### `.unless(condition, pipes)`

Inverse of `.when()` — pipes run only if the condition is falsy.

```python
Pipeline(user).unless(user.is_verified, [send_verification_prompt]).then_return()
```

---

### `.tap(callback)`

Runs a side effect without modifying the passable. Useful for logging, debugging, event emission.

| Parameter  | Type                | Description                                   |
|------------|---------------------|-----------------------------------------------|
| `callback` | `Callable[[Any], None]` | Called with the current passable. Return value is ignored. |

```python
Pipeline(request)
    .through([authenticate])
    .tap(lambda data: logger.info(f"Authenticated: {data.user_id}"))
    .through([authorize, process])
    .then_return()
```

---

### `.on_failure(handler)`

Registers an exception handler. If any pipe raises an exception, the handler is called with `(passable, exception)` instead of the pipeline crashing.

| Parameter | Type                                     | Description                                |
|-----------|------------------------------------------|--------------------------------------------|
| `handler` | `Callable[[Any, Exception], Any]`        | Receives the passable and the exception.   |

```python
def handle_error(data, error):
    log_error(error)
    return {"success": False, "error": str(error)}

result = (
    Pipeline(data)
    .through([risky_step_one, risky_step_two])
    .on_failure(handle_error)
    .then_return()
)
```

If no failure handler is set, exceptions propagate normally.

---

### `.then(destination)`

Executes the pipeline and passes the final result to a destination function.

| Parameter     | Type                  | Description                            |
|---------------|-----------------------|----------------------------------------|
| `destination` | `Callable[[Any], Any]`| Final function that receives the result.|

```python
result = Pipeline(data).through([validate, transform]).then(save_to_database)
```

---

### `.then_return()`

Executes the pipeline and returns the result directly without a destination.

```python
result = Pipeline(data).through([validate, transform]).then_return()
```

---

### `Pipe` (Base Class)

Abstract base class for class-based pipes.

```python
from lucid_pipeline import Pipe

class MyPipe(Pipe):
    def handle(self, data, next_pipe):
        # Transform data
        data["processed"] = True
        # MUST call next_pipe to continue the pipeline
        return next_pipe(data)
```

**The `next_pipe` callback:**

Class-based pipes receive a `next_pipe` callable. This is what makes them powerful — you can run logic before AND after the rest of the pipeline (like middleware):

```python
class TimingPipe(Pipe):
    def handle(self, data, next_pipe):
        start = time.time()
        result = next_pipe(data)          # Run remaining pipes
        elapsed = time.time() - start     # This runs AFTER downstream pipes
        print(f"Pipeline took {elapsed:.2f}s")
        return result
```

**Short-circuiting:** Returning without calling `next_pipe` stops the pipeline early.

```python
class AuthorizePipe(Pipe):
    def handle(self, data, next_pipe):
        if not data.get("is_authorized"):
            return {"error": "Unauthorized"}  # Pipeline stops here
        return next_pipe(data)
```

---

## Async Support

All pipeline methods have async equivalents. Use `AsyncPipeline` for async pipes.

```python
from lucid_pipeline import AsyncPipeline, AsyncPipe

async def fetch_user(data):
    data["user"] = await database.get_user(data["user_id"])
    return data

async def enrich_profile(data):
    data["profile"] = await external_api.get_profile(data["user"])
    return data

class AsyncCachePipe(AsyncPipe):
    async def handle(self, data, next_pipe):
        cached = await cache.get(data["key"])
        if cached:
            return cached
        result = await next_pipe(data)
        await cache.set(data["key"], result, ttl=300)
        return result

result = await (
    AsyncPipeline({"user_id": 42})
    .through([AsyncCachePipe(), fetch_user, enrich_profile])
    .then_return()
)
```

The `AsyncPipeline` accepts both sync and async callables — sync pipes are executed normally within the async pipeline.

---

## Real-World Examples

### HTTP Request Middleware

```python
class CORSMiddleware(Pipe):
    def handle(self, request, next_pipe):
        response = next_pipe(request)
        response.headers["Access-Control-Allow-Origin"] = "*"
        return response

class RateLimitMiddleware(Pipe):
    def __init__(self, max_requests: int = 100):
        self.max_requests = max_requests

    def handle(self, request, next_pipe):
        if get_request_count(request.ip) > self.max_requests:
            return Response(status=429)
        return next_pipe(request)

response = (
    Pipeline(incoming_request)
    .through([
        CORSMiddleware(),
        RateLimitMiddleware(max_requests=60),
        AuthenticationMiddleware(),
    ])
    .then(route_to_controller)
)
```

### ETL / Data Processing

```python
result = (
    Pipeline(raw_csv_rows)
    .through([
        remove_empty_rows,
        parse_dates,
        normalize_currencies,
    ])
    .when(config.remove_outliers, [filter_outliers])
    .tap(lambda data: print(f"Processing {len(data)} rows"))
    .through([aggregate_by_region])
    .then(write_to_parquet)
)
```

### Form Validation

```python
class ValidateRequired(Pipe):
    def __init__(self, fields: list[str]):
        self.fields = fields

    def handle(self, data, next_pipe):
        missing = [f for f in self.fields if not data.get(f)]
        if missing:
            raise ValidationError(f"Missing required fields: {missing}")
        return next_pipe(data)

class ValidateEmail(Pipe):
    def handle(self, data, next_pipe):
        import re
        email = data.get("email", "")
        if not re.match(r"^[^@]+@[^@]+\.[^@]+$", email):
            raise ValidationError("Invalid email address")
        return next_pipe(data)

class NormalizeData(Pipe):
    def handle(self, data, next_pipe):
        data["email"] = data["email"].lower().strip()
        data["name"] = data["name"].strip().title()
        return next_pipe(data)

def on_validation_error(data, error):
    return {"valid": False, "errors": str(error), "data": data}

result = (
    Pipeline(form_data)
    .through([
        ValidateRequired(["name", "email"]),
        ValidateEmail(),
        NormalizeData(),
    ])
    .on_failure(on_validation_error)
    .then(lambda data: {"valid": True, "data": data})
)
```

---

## Architecture

### Project Structure

```
lucid-pipeline/
├── src/
│   └── lucid_pipeline/
│       ├── __init__.py          # Public API exports
│       ├── pipeline.py          # Pipeline class
│       ├── async_pipeline.py    # AsyncPipeline class
│       ├── pipe.py              # Pipe base class
│       ├── async_pipe.py        # AsyncPipe base class
│       └── exceptions.py        # PipelineError, PipelineFlowError
├── tests/
│   ├── __init__.py
│   ├── test_pipeline.py         # Pipeline unit tests
│   ├── test_async_pipeline.py   # AsyncPipeline unit tests
│   ├── test_pipe.py             # Pipe class tests
│   ├── test_conditional.py      # .when() / .unless() tests
│   ├── test_error_handling.py   # .on_failure() tests
│   ├── test_tap.py              # .tap() tests
│   └── test_edge_cases.py       # Empty pipes, short-circuit, etc.
├── pyproject.toml
├── README.md
├── LICENSE                      # MIT License
└── CHANGELOG.md
```

### Implementation Notes

**Pipeline execution model:**

The pipeline uses a **reducer pattern** internally. For simple function pipes, each function receives the passable and returns the transformed value. For class-based `Pipe` instances, the pipeline builds a nested callable chain (like middleware onion) where each pipe's `handle()` receives a `next_pipe` callback pointing to the rest of the chain.

```
# Internal execution for function pipes:
# result = pipe_3(pipe_2(pipe_1(passable)))

# Internal execution for class-based pipes:
# Each handle() wraps the next, forming an onion:
# pipe_1.handle(data, lambda d: pipe_2.handle(d, lambda d: pipe_3.handle(d, identity)))
```

**Mixed pipes (functions + classes):**

When a pipeline contains both plain functions and Pipe instances, the pipeline wraps plain functions in a compatibility layer that calls `next_pipe` automatically:

```python
# Internally, a plain function like:
def double(x): return x * 2

# Gets wrapped as if it were:
class _WrappedDouble(Pipe):
    def handle(self, data, next_pipe):
        return next_pipe(double(data))
```

**Conditional evaluation:**

- `.when(bool_value, pipes)` — if `bool_value` is a plain bool, it's evaluated at pipeline construction time. Those pipes are either included or excluded from the chain before execution.
- `.when(callable, pipes)` — if it's a callable, it's evaluated at execution time with the current passable value. This enables dynamic branching based on intermediate results.

**Thread safety:**

Pipeline instances are NOT thread-safe. Create a new `Pipeline()` per request/task. The `Pipe` classes themselves can be shared across pipelines if they hold no mutable state.

**Type hints:**

The package should be fully typed with generics where practical:

```python
from lucid_pipeline import Pipeline

# Pipeline[InputType] tracks the passable type
pipeline: Pipeline[dict] = Pipeline({"key": "value"})
```

### Public API (what `__init__.py` exports)

```python
from lucid_pipeline.pipeline import Pipeline
from lucid_pipeline.async_pipeline import AsyncPipeline
from lucid_pipeline.pipe import Pipe
from lucid_pipeline.async_pipe import AsyncPipe
from lucid_pipeline.exceptions import PipelineError, PipelineFlowError

__all__ = [
    "Pipeline",
    "AsyncPipeline",
    "Pipe",
    "AsyncPipe",
    "PipelineError",
    "PipelineFlowError",
]
```

### Exceptions

| Exception            | When                                                         |
|----------------------|--------------------------------------------------------------|
| `PipelineError`      | Base exception for all pipeline errors.                      |
| `PipelineFlowError`  | Raised when pipeline is misconfigured (e.g., no pipes set).  |

---

## pyproject.toml Specification

```toml
[build-system]
requires = ["hatchling"]
build-backend = "hatchling.build"

[project]
name = "lucid-pipeline"
version = "0.1.0"
description = "A clean, expressive pipeline pattern for Python."
readme = "README.md"
license = "MIT"
requires-python = ">=3.10"
authors = [
    { name = "Your Name", email = "your@email.com" },
]
keywords = ["pipeline", "middleware", "chain", "pipe", "data-processing", "workflow"]
classifiers = [
    "Development Status :: 4 - Beta",
    "Intended Audience :: Developers",
    "License :: OSI Approved :: MIT License",
    "Programming Language :: Python :: 3",
    "Programming Language :: Python :: 3.10",
    "Programming Language :: Python :: 3.11",
    "Programming Language :: Python :: 3.12",
    "Programming Language :: Python :: 3.13",
    "Topic :: Software Development :: Libraries :: Python Modules",
    "Typing :: Typed",
]

[project.urls]
Homepage = "https://github.com/yourname/lucid-pipeline"
Documentation = "https://github.com/yourname/lucid-pipeline#readme"
Repository = "https://github.com/yourname/lucid-pipeline"
Issues = "https://github.com/yourname/lucid-pipeline/issues"

[tool.pytest.ini_options]
testpaths = ["tests"]
asyncio_mode = "auto"

[tool.mypy]
strict = true

[project.optional-dependencies]
dev = ["pytest>=7.0", "pytest-asyncio>=0.21", "mypy>=1.0", "ruff>=0.1"]
```

---

## Test Cases to Implement

### Core Pipeline

- Empty pipeline returns passable unchanged
- Single function pipe transforms data
- Multiple function pipes execute in order
- `then()` passes result to destination function
- `then_return()` returns result directly
- Pipeline is immutable — calling `.through()` returns same instance but appends (chainable)

### Class-Based Pipes

- Pipe subclass `handle()` is called with data and next_pipe
- Calling `next_pipe` continues the pipeline
- NOT calling `next_pipe` short-circuits (returns early)
- Pipe can run logic AFTER `next_pipe` (middleware pattern)
- Mixed function + class pipes work together

### Conditional Pipes

- `.when(True, pipes)` includes the pipes
- `.when(False, pipes)` skips the pipes
- `.when(callable, pipes)` evaluates callable at execution time
- `.unless(True, pipes)` skips the pipes
- `.unless(False, pipes)` includes the pipes
- Callable condition receives current passable value

### Tap

- `.tap()` receives current passable
- `.tap()` return value is ignored (passable unchanged)
- `.tap()` exceptions propagate normally

### Error Handling

- Without `on_failure`, exceptions propagate normally
- With `on_failure`, handler receives (passable, exception)
- Handler return value becomes the pipeline result
- Handler is called with the original passable, not intermediate state

### Async Pipeline

- Async function pipes are awaited
- Async Pipe subclass `handle()` is awaited
- Sync functions work inside AsyncPipeline
- `.when()` / `.unless()` / `.tap()` / `.on_failure()` all work async
- Async callable conditions in `.when()` are awaited

### Edge Cases

- Pipeline with no pipes returns passable unchanged
- Pipeline with `None` as passable works
- Pipe that returns `None` explicitly passes `None` forward
- Deeply nested pipelines (100+ pipes) don't stack overflow
- `.pipe()` is equivalent to `.through([single_pipe])`
- `.through()` called multiple times appends correctly

---

## Publishing to PyPI

```bash
# Install build tools
pip install build twine

# Build the package
python -m build

# Upload to TestPyPI first
twine upload --repository testpypi dist/*

# Test install from TestPyPI
pip install --index-url https://test.pypi.org/simple/ lucid-pipeline

# Upload to real PyPI
twine upload dist/*
```

---

## Part of the Lucid Ecosystem

Lucid Pipeline is the first package in the **Lucid** ecosystem — a collection of tools bringing clarity and convention to Python development.

Coming soon:

- `lucid-container` — Dependency injection container with autowiring
- `lucid-cache` — Multi-driver cache with a unified API
- `lucid-config` — Cascading configuration management
- `lucid-events` — Event dispatcher with listeners and subscribers

---

## License

MIT License. See [LICENSE](LICENSE) for details.
