Metadata-Version: 2.1
Name: scalable_pypeline
Version: 2.2.7
Summary: PypeLine - Python pipelines for the Real World
Home-page: https://gitlab.com/bravos2/pypeline
Author: Bravos Power Corporation
License: Apache License 2.0
Description-Content-Type: text/markdown
Provides-Extra: build
Provides-Extra: flask
Provides-Extra: web
Provides-Extra: workers
Provides-Extra: dev
Provides-Extra: test
License-File: LICENSE

```
______ __   ________  _____  _     _____  _   _  _____
| ___ \\ \ / /| ___ \|  ___|| |   |_   _|| \ | ||  ___|
| |_/ / \ V / | |_/ /| |__  | |     | |  |  \| || |__
|  __/   \ /  |  __/ |  __| | |     | |  | . ` ||  __|
| |      | |  | |    | |___ | |_____| |_ | |\  || |___
\_|      \_/  \_|    \____/ \_____/\___/ \_| \_/\____/
```

PypeLine is a Python library for building and running distributed data pipelines. It provides:

- **DAG Pipelines** — define tasks as a directed acyclic graph; PypeLine handles execution order and parallelism
- **Scheduled Tasks** — run recurring jobs via cron schedule
- **Flask API** — built-in REST API with OpenAPI/Swagger documentation for triggering and managing pipelines
- **Job Runner** — one-shot worker for running a single pipeline task (e.g., in a Kubernetes Job)

PypeLine uses [Dramatiq](https://dramatiq.io/) for task queuing, RabbitMQ (or Redis) as the message broker, and Redis for result storage.

---

## Table of Contents

- [Requirements](#requirements)
- [Installation](#installation)
- [Project Structure](#project-structure)
- [1. Define Your Tasks](#1-define-your-tasks)
- [2. Configure `pypeline.yaml`](#2-configure-pypelineyaml)
- [3. Set Up Your Flask App](#3-set-up-your-flask-app)
- [4. Set Environment Variables](#4-set-environment-variables)
- [5. Run the Services](#5-run-the-services)
- [Configuration Reference](#configuration-reference)
- [API Authentication](#api-authentication)
- [Testing](#testing)

---

## Requirements

- Python 3.8+
- RabbitMQ (default broker) or Redis (as broker)
- Redis (for result storage — always required)

**Docker Compose** is the easiest way to run RabbitMQ and Redis locally. See [Run the Services](#5-run-the-services) below.

---

## Installation

Install with the extras you need:

```bash
# Full install (Flask API + web server + pipeline workers)
pip install scalable-pypeline[flask,web,workers]

# Workers only (no Flask API)
pip install scalable-pypeline[workers]
```

Available extras:

| Extra | Installs |
|-------|----------|
| `flask` | Flask, flask-smorest (OpenAPI) |
| `web` | gunicorn, gevent (production web server) |
| `workers` | Dramatiq, APScheduler, networkx (pipeline execution) |
| `dev` | black |
| `test` | pytest, tox, coverage |

---

## Project Structure

PypeLine expects your application to be a Python package with a `pypeline.yaml` file inside it:

```
my_app/
├── my_app/
│   ├── __init__.py          # must define __version__
│   ├── pypeline.yaml        # PypeLine configuration
│   ├── pipeline.py          # your pipeline task functions
│   └── scheduled_tasks.py   # your scheduled task functions
├── app.py                   # Flask application factory
├── extensions.py            # Dramatiq extension
└── my_app.env               # environment variables
```

---

## 1. Define Your Tasks

Tasks are plain Python functions. Pipeline tasks receive an `event` argument containing the pipeline's input data. Scheduled tasks take no arguments.

**`my_app/pipeline.py`**
```python
def step_a(event):
    # event contains the data passed when the pipeline was triggered
    data = event.get("my_input")
    print(f"Step A processing: {data}")


def step_b(event):
    print("Step B running (depends on A)")


def step_c(event):
    print("Step C running (depends on A, runs in parallel with B)")
```

**`my_app/scheduled_tasks.py`**
```python
def hourly_report():
    print("Running hourly report...")
```

---

## 2. Configure `pypeline.yaml`

Place `pypeline.yaml` inside your Python package directory (next to `__init__.py`).

There are two pipeline schema versions. Use `schemaVersion: 1` for simple pipelines and `schemaVersion: 2` when you need multiple handlers per task node or typed pipeline settings.

### Schema Version 1

Each task node maps to a single handler function. This is the simplest option and covers most use cases.

```yaml
serviceConfig:
    - name: pipeline-worker
      registeredTasks:
          - handler: my_app.pipeline.step_a
          - handler: my_app.pipeline.step_b
          - handler: my_app.pipeline.step_c
          - handler: my_app.scheduled_tasks.hourly_report

pipelines:
    my-pipeline:
        name: My Pipeline
        description: An example pipeline with parallel steps
        schemaVersion: 1
        config:
            # DAG adjacency: step_a runs first, then step_b and step_c in parallel
            dagAdjacency:
                step_a:
                    - step_b
                    - step_c
            metadata:
                maxRetry: 3
                retryBackoff: 60        # seconds before first retry
                retryBackoffMax: 300    # max backoff seconds
                retryJitter: true
                maxTtl: 3600           # task time-to-live in seconds
                queue: pipeline-queue
            taskDefinitions:
                step_a:
                    handler: my_app.pipeline.step_a
                step_b:
                    handler: my_app.pipeline.step_b
                    queue: step-b-queue  # optional per-task queue override
                step_c:
                    handler: my_app.pipeline.step_c

scheduledTasks:
    hourly-report:
        name: Hourly Report
        enabled: true
        schemaVersion: 1
        config:
            task: my_app.scheduled_tasks.hourly_report
            queue: pipeline-queue
            schedule:
                minute: '0'
                hour: '*'
                dayOfWeek: '*'
                dayOfMonth: '*'
                monthOfYear: '*'
```

### Schema Version 2

Version 2 adds two capabilities:

1. **Multiple handlers per task node** (`handlers` list instead of `handler` string). When triggering a pipeline you can specify which handler index to use for each node — this enables scenario-based execution (e.g. run an alternate algorithm for a given task without changing the pipeline structure).
2. **Typed `settings` schema** — define and validate the input parameters your pipeline accepts.

```yaml
pipelines:
    my-pipeline-v2:
        name: My Pipeline V2
        description: Pipeline with swappable task handlers and validated settings
        schemaVersion: 2
        config:
            dagAdjacency:
                step_a:
                    - step_b
                    - step_c
            metadata:
                maxRetry: 3
                maxTtl: 3600
                queue: pipeline-queue
            taskDefinitions:
                step_a:
                    # Index 0: default handler; index 1: alternate implementation
                    handlers:
                        - my_app.pipeline.step_a_default
                        - my_app.pipeline.step_a_alternate
                step_b:
                    handlers:
                        - my_app.pipeline.step_b
                step_c:
                    handlers:
                        - my_app.pipeline.step_c

            # Optional: define typed, validated input settings for this pipeline
            settings:
                required:
                    - threshold
                properties:
                    threshold:
                        dataType: float
                        inputType: text
                        label: Decision Threshold
                        minimum: 0.0
                        maximum: 1.0
                    mode:
                        dataType: string
                        inputType: dropdown
                        label: Processing Mode
                        options:
                            - label: Fast
                              value: fast
                            - label: Accurate
                              value: accurate
```

All registered handlers must still be listed under `serviceConfig.registeredTasks`:

```yaml
serviceConfig:
    - name: pipeline-worker
      registeredTasks:
          - handler: my_app.pipeline.step_a_default
          - handler: my_app.pipeline.step_a_alternate
          - handler: my_app.pipeline.step_b
          - handler: my_app.pipeline.step_c
```

**Environment variable interpolation** — use `${VAR_NAME}` or `${VAR_NAME:default}` anywhere in `pypeline.yaml`:

```yaml
metadata:
    queue: ${TASK_QUEUE:pipeline-queue}
```

---

## 3. Set Up Your Flask App

**`extensions.py`** — create the Dramatiq extension:

```python
from pypeline.dramatiq import Dramatiq

dramatiq = Dramatiq()
```

**`app.py`** — create the Flask application factory:

```python
from flask import Flask
from pypeline.flask import FlaskPypeline
from extensions import dramatiq


def create_app():
    app = Flask(__name__)
    app.config.from_envvar("APP_CONFIG")  # or app.config.from_object(...)

    # Initialize Dramatiq broker (must happen before FlaskPypeline)
    dramatiq.init_app(app)

    # Initialize PypeLine — pass init_api=True to enable the REST API + Swagger UI
    pypeline = FlaskPypeline()
    pypeline.init_app(app, init_api=True)

    # Optionally register your own API blueprints into the PypeLine API
    # app.extensions["pypeline_core_api"].register_blueprint(my_bp)

    return app


if __name__ == "__main__":
    app = create_app()
    app.run(port=5001)
```

When `init_api=True`, PypeLine registers:
- `GET/POST /api/v1/pipelines` — list and trigger pipelines
- `GET/POST /api/v1/schedules` — list and manage scheduled tasks
- Swagger UI at `/api/v1/docs`

---

## 4. Set Environment Variables

```env
# Required: tells PypeLine which package contains your pypeline.yaml
PYPELINE_CLIENT_PKG_NAME=my_app

# Redis URL (used for result storage, always required)
REDIS_URL=redis://:password@localhost:6379/0

# Message broker: RABBITMQ (default) or REDIS
MESSAGE_BROKER=RABBITMQ
RABBIT_URL=amqp://admin:password@localhost:5672

# Optional: protect API endpoints with an access key
API_ACCESS_KEY=your-secret-key

# Optional: override which worker config to load (defaults to WORKER_NAME env var)
# WORKER_NAME=pipeline-worker
```

---

## 5. Run the Services

### Start Infrastructure

```bash
# docker-compose.yml (example)
# services: rabbitmq, redis
docker compose up -d
```

### Run the Flask API

```bash
# Development
flask --app app run --port 5001

# Production
gunicorn "app:create_app()" --bind 0.0.0.0:5001 --worker-class gevent
```

### Run Pipeline Workers

Workers consume tasks from the queues defined in `pypeline.yaml`:

```bash
flask --app app pypeline-worker

# Options:
#   -p / --processes   number of worker processes (default: CPU count)
#   -t / --threads     threads per process (default: 8)
#   -Q / --queues      comma-separated list of queues to consume from
flask --app app pypeline-worker -p 2 -t 4 -Q pipeline-queue
```

### Run the Cron Scheduler

The scheduler reads your `scheduledTasks` config and enqueues tasks on their cron schedules:

```bash
flask --app app cron-scheduler
```

### Run a One-Shot Job (Kubernetes Jobs)

The `job-runner` CLI starts a single worker that processes one task and exits — useful for Kubernetes Jobs:

```bash
# Listen on a specific queue; exits after processing one task
job-runner -q pipeline-queue

# With an idle timeout (exit if no job arrives within 30 seconds)
job-runner -q pipeline-queue --idle-timeout-ms 30000
```

---

## Configuration Reference

### `pypeline.yaml` Structure

| Field | Required | Description |
|-------|----------|-------------|
| `serviceConfig` | Yes | List of worker definitions. Each entry has a `name` and `registeredTasks`. |
| `serviceConfig[].registeredTasks` | Yes | List of task handlers (`handler: module.path.function`) this worker loads. |
| `pipelines` | No | Dict of pipeline definitions keyed by pipeline ID. |
| `scheduledTasks` | No | Dict of scheduled task definitions keyed by task ID. |

### Pipeline `config` Fields

| Field | Description |
|-------|-------------|
| `dagAdjacency` | Dict mapping each task to a list of tasks that run after it. |
| `taskDefinitions` | Dict mapping task names to their handler function paths. |
| `metadata.maxRetry` | Max number of retries on failure. |
| `metadata.retryBackoff` | Seconds before first retry. |
| `metadata.retryBackoffMax` | Max backoff seconds. |
| `metadata.retryJitter` | Add random jitter to retry delays. |
| `metadata.maxTtl` | Max task lifetime in seconds before it is discarded. |
| `metadata.queue` | RabbitMQ/Redis queue name for this pipeline's tasks. |

### Environment Variables

| Variable | Default | Description |
|----------|---------|-------------|
| `PYPELINE_CLIENT_PKG_NAME` | — | **Required.** Your package name (directory containing `pypeline.yaml`). |
| `REDIS_URL` | `redis://localhost:6379/0` | Redis connection URL. |
| `MESSAGE_BROKER` | `RABBITMQ` | Broker type: `RABBITMQ` or `REDIS`. |
| `RABBIT_URL` | `amqp://admin:password@127.0.0.1:5672` | RabbitMQ connection URL. |
| `API_ACCESS_KEY` | — | If set, API endpoints require this key in the `accesskey` header. |
| `WORKER_NAME` | — | Name of the service config entry this worker uses. Defaults to `serviceConfig[0]`. |
| `PYPELINE_YAML_PATH` | `pypeline.yaml` | Relative path to `pypeline.yaml` within the package. |
| `IDLE_TIMEOUT_MS` | `0` (infinite) | `job-runner` only: exit if no job starts within this many milliseconds. |
| `RESTRICT_WORKER_SHUTDOWN_WHILE_JOBS_RUNNING` | `false` | Enable graceful shutdown — prevents workers from stopping mid-task. |

---

## API Authentication

Protect your API by setting `API_ACCESS_KEY` in the environment. Clients must include the key in requests:

```bash
curl -H "accesskey: your-secret-key" http://localhost:5001/api/v1/pipelines
```

You can also use the `require_accesskey` decorator on your own endpoints:

```python
from pypeline.flask.decorators import require_accesskey

@bp.route("/my-endpoint")
@require_accesskey
def my_endpoint():
    return {"status": "ok"}
```

---

## Testing

Install test dependencies:

```bash
pip install -e ".[test]"
```

Run tests:

```bash
tox
# or directly with pytest
pytest
```
