Metadata-Version: 2.4
Name: philiprehberger-data-pipeline
Version: 0.3.0
Summary: Composable data transformation pipeline with lazy evaluation
Project-URL: Homepage, https://github.com/philiprehberger/py-data-pipeline#readme
Project-URL: Repository, https://github.com/philiprehberger/py-data-pipeline
Project-URL: Issues, https://github.com/philiprehberger/py-data-pipeline/issues
Project-URL: Changelog, https://github.com/philiprehberger/py-data-pipeline/blob/main/CHANGELOG.md
Author: Philip Rehberger
License-Expression: MIT
License-File: LICENSE
Keywords: data-processing,etl,pipeline,transform,workflow
Classifier: Development Status :: 3 - Alpha
Classifier: Intended Audience :: Developers
Classifier: License :: OSI Approved :: MIT License
Classifier: Programming Language :: Python :: 3
Classifier: Programming Language :: Python :: 3.10
Classifier: Programming Language :: Python :: 3.11
Classifier: Programming Language :: Python :: 3.12
Classifier: Programming Language :: Python :: 3.13
Classifier: Typing :: Typed
Requires-Python: >=3.10
Description-Content-Type: text/markdown

# philiprehberger-data-pipeline

[![Tests](https://github.com/philiprehberger/py-data-pipeline/actions/workflows/publish.yml/badge.svg)](https://github.com/philiprehberger/py-data-pipeline/actions/workflows/publish.yml)
[![PyPI version](https://img.shields.io/pypi/v/philiprehberger-data-pipeline.svg)](https://pypi.org/project/philiprehberger-data-pipeline/)
[![GitHub release](https://img.shields.io/github/v/release/philiprehberger/py-data-pipeline)](https://github.com/philiprehberger/py-data-pipeline/releases)
[![Last updated](https://img.shields.io/github/last-commit/philiprehberger/py-data-pipeline)](https://github.com/philiprehberger/py-data-pipeline/commits/main)
[![License](https://img.shields.io/github/license/philiprehberger/py-data-pipeline)](LICENSE)
[![Bug Reports](https://img.shields.io/github/issues/philiprehberger/py-data-pipeline/bug)](https://github.com/philiprehberger/py-data-pipeline/issues?q=is%3Aissue+is%3Aopen+label%3Abug)
[![Feature Requests](https://img.shields.io/github/issues/philiprehberger/py-data-pipeline/enhancement)](https://github.com/philiprehberger/py-data-pipeline/issues?q=is%3Aissue+is%3Aopen+label%3Aenhancement)
[![Sponsor](https://img.shields.io/badge/sponsor-GitHub%20Sponsors-ec6cb9)](https://github.com/sponsors/philiprehberger)

Composable data transformation pipeline with lazy evaluation.

## Installation

```bash
pip install philiprehberger-data-pipeline
```

## Usage

### Basic Pipeline

```python
from philiprehberger_data_pipeline import Pipeline

data = [
    {"name": " Alice ", "email": "alice@example.com", "status": "active", "age": 30},
    {"name": "Bob", "email": "bob@example.com", "status": "inactive", "age": 25},
    {"name": "Alice", "email": "alice@example.com", "status": "active", "age": 30},
]

result = (
    Pipeline(data)
    .filter(lambda r: r["status"] == "active")
    .map(lambda r: {**r, "name": r["name"].strip()})
    .unique_by("email")
    .sort_by("name")
    .collect()
)
```

### Reusable Pipelines

```python
clean_users = (
    Pipeline.define()
    .filter(lambda r: r.get("email"))
    .map(lambda r: {**r, "email": r["email"].lower()})
    .unique_by("email")
)

active = clean_users.run(active_users)
archived = clean_users.run(archived_users)
```

### Sliding Window

```python
data = [1, 2, 3, 4, 5]

# Window of size 3, step 1
Pipeline(data).window(3, 1).collect()
# [[1, 2, 3], [2, 3, 4], [3, 4, 5]]

# Window of size 3, step 2
Pipeline(data).window(3, 2).collect()
# [[1, 2, 3], [3, 4, 5]]
```

### Deduplication

```python
Pipeline([1, 2, 3, 2, 1, 4]).deduplicate().collect()
# [1, 2, 3, 4]

# Works with unhashable items too
Pipeline([{"a": 1}, {"a": 1}, {"b": 2}]).deduplicate().collect()
# [{"a": 1}, {"b": 2}]
```

### Aggregations

```python
p = Pipeline(sales_data)
total = p.sum("amount")
average = p.avg("amount")
grouped = p.group_by("category")
```

### Export

```python
Pipeline(data).filter(...).to_csv("output.csv")
Pipeline(data).filter(...).to_json("output.json")
```

## API

| Function / Class | Description |
|------------------|-------------|
| `Pipeline(data)` | Composable, lazy data transformation pipeline with chainable operations and terminal methods |
| `.filter(fn)` | Keep items where fn returns True |
| `.map(fn)` | Transform each item |
| `.flat_map(fn)` | Transform and flatten |
| `.flatten()` | Flatten one level of nesting |
| `.sort_by(key)` | Sort by key (string or callable) |
| `.unique_by(key)` | Remove duplicates by key |
| `.take(n)` | Take first n items |
| `.skip(n)` | Skip first n items |
| `.chunk(size)` | Split into chunks |
| `.each(fn)` | Execute side effect for each item |
| `.window(size, step)` | Sliding window grouping |
| `.deduplicate()` | Remove duplicate items preserving order |
| `.collect()` | Execute and return list |
| `.first()` | Return first item |
| `.count()` | Count items |
| `.sum(key)` | Sum values |
| `.avg(key)` | Average values |
| `.min(key)` | Find minimum value |
| `.max(key)` | Find maximum value |
| `.reduce(fn, initial)` | Reduce to single value |
| `.group_by(key)` | Group into dict |
| `.to_csv(path)` | Export as CSV |
| `.to_json(path)` | Export as JSON |

## Development

```bash
pip install -e .
python -m pytest tests/ -v
```

## Support

If you find this package useful, consider giving it a star on GitHub — it helps motivate continued maintenance and development.

[![LinkedIn](https://img.shields.io/badge/Philip%20Rehberger-LinkedIn-0A66C2?logo=linkedin)](https://www.linkedin.com/in/philiprehberger)
[![More packages](https://img.shields.io/badge/more-open%20source%20packages-blue)](https://philiprehberger.com/open-source-packages)

## License

[MIT](LICENSE)
