Metadata-Version: 2.4
Name: jsleekr-pipechain
Version: 0.1.0
Summary: Lightweight DAG-based data pipeline runner for local development
Author-email: JSLEEKR <93jslee@gmail.com>
License: MIT
Keywords: pipeline,dag,etl,data,workflow
Classifier: Development Status :: 4 - Beta
Classifier: Intended Audience :: Developers
Classifier: License :: OSI Approved :: MIT License
Classifier: Programming Language :: Python :: 3
Classifier: Programming Language :: Python :: 3.9
Classifier: Programming Language :: Python :: 3.10
Classifier: Programming Language :: Python :: 3.11
Classifier: Programming Language :: Python :: 3.12
Classifier: Topic :: Software Development :: Libraries
Requires-Python: >=3.9
Description-Content-Type: text/markdown
License-File: LICENSE
Provides-Extra: dev
Requires-Dist: pytest>=7.0; extra == "dev"
Requires-Dist: pytest-cov>=4.0; extra == "dev"
Requires-Dist: pytest-asyncio>=0.21; extra == "dev"
Dynamic: license-file

<div align="center">

# :link: pipechain

### Lightweight DAG-based data pipeline runner for local development

[![Stars](https://img.shields.io/github/stars/JSLEEKR/pipechain?style=for-the-badge)](https://github.com/JSLEEKR/pipechain/stargazers)
[![License](https://img.shields.io/badge/License-MIT-blue?style=for-the-badge)](LICENSE)
[![Python](https://img.shields.io/badge/Python-3.9+-3776AB?style=for-the-badge&logo=python&logoColor=white)](https://python.org)
[![Tests](https://img.shields.io/badge/Tests-613-brightgreen?style=for-the-badge)](tests/)

<br/>

**Define data pipelines as Python functions. Get dependency resolution, parallel execution, retries, and caching -- without Airflow overhead.**

</div>

---

## Why This Exists

Data pipelines are everywhere, but the tooling is either massive (Airflow, Prefect, Dagster) or nonexistent (raw scripts with nested try/except).

**pipechain** fills the gap: a zero-dependency, pure-Python pipeline runner that gives you DAG-based execution, parallel steps, retry policies, result caching, and lifecycle hooks -- all in a library you can `pip install` and use in 5 lines of code.

No scheduler. No database. No Docker. Just Python functions wired into a DAG.

---

## Features

- **DAG-based execution** -- automatic dependency resolution with cycle detection
- **Parallel execution** -- independent steps run concurrently with configurable concurrency limits
- **Retry policies** -- exponential backoff, linear, jitter, per-exception filtering
- **Result caching** -- in-memory or file-based, with TTL support
- **Conditional steps** -- skip steps based on runtime context
- **Lifecycle hooks** -- before/after pipeline, before/after step, on error, on retry, on skip, on cache hit
- **Tag filtering** -- run subsets of steps by tag
- **Dry-run mode** -- validate the plan without executing
- **Shared context** -- thread-safe data store with namespaces and snapshot/rollback
- **CLI interface** -- run, validate, visualize, and plan pipelines from the command line
- **Type-safe** -- full type annotations throughout

---

## Installation

```bash
pip install pipechain
```

---

## Quick Start

```python
from pipechain import Pipeline, Step, Context

# Define steps as plain functions
def extract(ctx: Context):
    """Load raw data."""
    return {"users": [{"name": "Alice", "age": 30}, {"name": "Bob", "age": 25}]}

def transform(ctx: Context):
    """Filter and enrich data."""
    data = ctx.get("extract.output")
    return [u for u in data["users"] if u["age"] >= 28]

def load(ctx: Context):
    """Save results."""
    users = ctx.get("transform.output")
    ctx.set("loaded_count", len(users))
    return f"Loaded {len(users)} users"

# Build the pipeline
pipe = Pipeline("etl_example")
pipe.add_step(Step("extract", extract))
pipe.add_step(Step("transform", transform, depends_on=["extract"]))
pipe.add_step(Step("load", load, depends_on=["transform"]))

# Run it
result = pipe.run()
print(result.summary())
```

---

## Using the Decorator API

```python
from pipechain import Pipeline, step, Context
from pipechain.retry import RetryPolicy

pipe = Pipeline("decorated_etl")

@step(name="fetch", retry=RetryPolicy.exponential(max_retries=3, base_delay=0.5))
def fetch_data(ctx: Context):
    """Fetch data from API with automatic retries."""
    return {"items": list(range(100))}

@step(name="process", depends_on=["fetch"], tags=["cpu"])
def process_data(ctx: Context):
    """Process the fetched data."""
    items = ctx.get("fetch.output")["items"]
    return [x * 2 for x in items]

@step(name="save", depends_on=["process"], cache_key="save_v1", cache_ttl=3600)
def save_results(ctx: Context):
    """Save with 1-hour cache."""
    return len(ctx.get("process.output"))

pipe.add_step(fetch_data)
pipe.add_step(process_data)
pipe.add_step(save_results)

result = pipe.run()
```

---

## Parallel Execution

Steps without dependencies on each other execute in parallel automatically:

```python
pipe = Pipeline("parallel_demo", max_concurrency=4)

pipe.add_step(Step("fetch_users", fetch_users))
pipe.add_step(Step("fetch_orders", fetch_orders))
pipe.add_step(Step("fetch_products", fetch_products))
pipe.add_step(Step("merge", merge_all, depends_on=["fetch_users", "fetch_orders", "fetch_products"]))

# fetch_users, fetch_orders, fetch_products run in parallel
# merge waits for all three to complete
print(pipe.visualize())
```

Output:
```
Pipeline: parallel_demo

  Level 0:
    [fetch_orders]
    [fetch_products]
    [fetch_users]
      |
      v
  Level 1:
    [merge] <- [fetch_users, fetch_orders, fetch_products]
```

---

## Conditional Steps

```python
pipe.add_step(Step(
    "send_alerts",
    send_alerts,
    depends_on=["analyze"],
    condition=lambda ctx: ctx.get("analyze.output", {}).get("has_anomalies", False),
))
```

---

## Lifecycle Hooks

```python
from pipechain.hooks import HookType

pipe.on(HookType.BEFORE_PIPELINE, lambda **kw: print("Starting pipeline..."))
pipe.on(HookType.AFTER_STEP, lambda **kw: print(f"Completed: {kw['step'].name}"))
pipe.on(HookType.ON_STEP_ERROR, lambda **kw: log_error(kw['error']))
pipe.on(HookType.ON_STEP_RETRY, lambda **kw: print(f"Retrying {kw['step'].name}, attempt {kw['attempt']}"))
```

---

## Caching

```python
from pipechain.cache import MemoryCache, FileCache

# In-memory cache (fast, lost on restart)
pipe = Pipeline("cached", cache=MemoryCache())

# File-based cache (persists across runs)
pipe = Pipeline("cached", cache=FileCache(".pipeline_cache"))

# Per-step cache keys and TTL
pipe.add_step(Step("expensive", compute, cache_key="v1", cache_ttl=3600))
```

---

## Context (Shared Data Store)

```python
from pipechain import Context

ctx = Context({"env": "production"})

# Steps read/write through context
ctx.set("step1.output", {"data": [1, 2, 3]})
ctx.get("step1.output")  # {"data": [1, 2, 3]}

# Namespaces
ctx.namespace("step1")  # {"output": {"data": [1, 2, 3]}}

# Snapshot & rollback
snap = ctx.snapshot()
ctx.set("temp", "value")
ctx.rollback(snap)  # Restores previous state
```

---

## CLI Usage

```bash
# Run a pipeline
pipechain run pipeline.py

# Dry run (validate without executing)
pipechain run pipeline.py --dry-run

# JSON output
pipechain run pipeline.py --json

# Filter by tags
pipechain run pipeline.py --tags etl critical

# Validate a pipeline file
pipechain validate pipeline.py

# Show execution plan
pipechain plan pipeline.py

# Visualize the DAG
pipechain visualize pipeline.py
```

---

## Async Support

```python
import asyncio
from pipechain import Pipeline, Step

async def async_fetch(ctx):
    await asyncio.sleep(0.1)  # Simulate I/O
    return {"data": "fetched"}

pipe = Pipeline("async_demo")
pipe.add_step(Step("fetch", async_fetch))

# Sync API (creates event loop internally)
result = pipe.run()

# Or use async directly
result = await pipe.async_run()
```

---

## Retry Policies

```python
from pipechain.retry import RetryPolicy

# No retries
RetryPolicy.none()

# Linear (constant delay)
RetryPolicy.linear(max_retries=3, delay=1.0)

# Exponential backoff with jitter
RetryPolicy.exponential(max_retries=5, base_delay=0.5, max_delay=30.0)

# Custom: only retry on specific exceptions
RetryPolicy(
    max_retries=3,
    base_delay=1.0,
    retry_on=(ConnectionError, TimeoutError),
    no_retry_on=(ValueError,),
)
```

---

## Architecture

```
pipechain/
  __init__.py       # Public API exports
  pipeline.py       # Pipeline orchestrator (DAG execution engine)
  step.py           # Step definition and @step decorator
  context.py        # Thread-safe shared data store
  dag.py            # DAG with topological sort and cycle detection
  result.py         # StepResult and PipelineResult types
  retry.py          # Retry policies (exponential, linear, custom)
  cache.py          # Cache backends (memory, file)
  hooks.py          # Lifecycle hook system
  cli.py            # Command-line interface
```

---

## API Reference

### Pipeline

| Method | Description |
|--------|-------------|
| `add_step(step)` | Add a Step or callable to the pipeline |
| `remove_step(name)` | Remove a step by name |
| `run(context, dry_run, tags)` | Execute the pipeline synchronously |
| `async_run(context, dry_run, tags)` | Execute the pipeline asynchronously |
| `validate()` | Check for dependency errors |
| `execution_plan()` | Get parallel execution groups |
| `visualize()` | ASCII DAG visualization |
| `on(hook_type, callback)` | Register a lifecycle hook |

### Step

| Parameter | Type | Description |
|-----------|------|-------------|
| `name` | `str` | Unique step identifier |
| `fn` | `Callable` | Function to execute (sync or async) |
| `depends_on` | `list[str]` | Step dependencies |
| `retry_policy` | `RetryPolicy` | Retry configuration |
| `cache_key` | `str` | Cache key for result caching |
| `cache_ttl` | `float` | Cache TTL in seconds |
| `condition` | `Callable` | Conditional execution predicate |
| `timeout` | `float` | Execution timeout in seconds |
| `tags` | `list[str]` | Tags for filtering |

### Context

| Method | Description |
|--------|-------------|
| `get(key, default)` | Get a value |
| `set(key, value)` | Set a value |
| `has(key)` | Check if key exists |
| `delete(key)` | Delete a key |
| `namespace(prefix)` | Get all values under a prefix |
| `snapshot()` | Take a snapshot |
| `rollback(id)` | Restore a snapshot |
| `merge(dict)` | Merge data into context |

---

## License

MIT
