Metadata-Version: 2.1
Name: grpipe
Version: 0.0.3
Summary: Computation pipelines with non-linear dependency graphs
Author-email: Dominik Schröder <schroeder.dominik@gmail.com>
Project-URL: Homepage, https://wirhabenzeit.github.io/grpipe/
Project-URL: Repository, https://github.com/wirhabenzeit/grpipe
Project-URL: Documentation, https://wirhabenzeit.github.io/grpipe/
Keywords: python
Classifier: Intended Audience :: Developers
Classifier: Programming Language :: Python
Classifier: Programming Language :: Python :: 3
Classifier: Programming Language :: Python :: 3.10
Classifier: Programming Language :: Python :: 3.11
Classifier: Programming Language :: Python :: 3.12
Classifier: Topic :: Software Development :: Libraries :: Python Modules
Requires-Python: <3.13,>=3.10
Description-Content-Type: text/markdown
License-File: LICENSE
Requires-Dist: frozendict>=2.4.6
Requires-Dist: marimo>=0.9.10
Requires-Dist: networkx>=3.2.1
Requires-Dist: polars
Requires-Dist: numpy>=2.0.2
Requires-Dist: xxhash>=3.5.0
Requires-Dist: pandas>=2.2.3

# GRPipe: A Flexible Pipeline Framework for Python

GRPipe is a minimalistic and flexible computational pipeline framework for Python that allows you to create complex data processing workflows. It provides a clean and intuitive API for defining, connecting, and executing pipeline steps, with built-in support for caching, argument binding, and parameter management.

[![Release](https://img.shields.io/github/v/release/wirhabenzeit/grpipe)](https://img.shields.io/github/v/release/wirhabenzeit/grpipe)
[![Build status](https://img.shields.io/github/actions/workflow/status/wirhabenzeit/grpipe/main.yml?branch=main)](https://github.com/wirhabenzeit/grpipe/actions/workflows/main.yml?query=branch%3Amain)
[![codecov](https://codecov.io/gh/wirhabenzeit/grpipe/branch/main/graph/badge.svg)](https://codecov.io/gh/wirhabenzeit/grpipe)
[![Commit activity](https://img.shields.io/github/commit-activity/m/wirhabenzeit/grpipe)](https://img.shields.io/github/commit-activity/m/wirhabenzeit/grpipe)
[![License](https://img.shields.io/github/license/wirhabenzeit/grpipe)](https://img.shields.io/github/license/wirhabenzeit/grpipe)

Computation pipelines with non-linear dependency graphs

- **Github repository**: <https://github.com/wirhabenzeit/grpipe/>
- **Documentation** <https://wirhabenzeit.github.io/grpipe/>

## Features

- Easy-to-use decorator-based API for defining pipeline steps
- Automatic dependency resolution between steps
- Built-in caching mechanism for improved performance
- Flexible argument binding and parameter management
- Support for both linear and branching pipelines
- Verbose logging option for debugging and monitoring

## Installation

You can install GRPipe using pip:

```bash
pip install grpipe
```

## Quickstart

### Arguments vs. Parameters

In GRPipe, there's an important distinction between arguments and parameters:

- **Arguments** are either external inputs passed to the final pipeline or outputs from other internal steps. They are defined using the `Argument` class and connected between steps.
- **Parameters** are tunable settings for individual steps. They can be adjusted globally using the `set_params` method of the pipeline.

Here's an example illustrating the difference:

```python
# Define arguments
data = Argument("data")
threshold = Argument("threshold")


# `operation` is a parameter of the step that can be adjusted
@step(args={"input_data": data, "cutoff": threshold})
def process_data(input_data, cutoff, operation="mean"):
    if operation == "mean":
        result = input_data[input_data > cutoff].mean()
    elif operation == "std":
        result = input_data[input_data > cutoff].std()
    else:
        raise ValueError("Invalid operation")
    return result


@step(args={"processed": process_data})
def format_result(processed):
    return f"The result is: {processed:.2f}"


# Create the pipeline
pipeline = format_result

# Run the pipeline with different arguments
result1 = pipeline(data=np.arange(100), threshold=2)
print(result1)

# Adjust a parameter
pipeline.set_params(process_data__operation="std")

# Run again with the same arguments but different parameters
result2 = pipeline(data=np.arange(100), threshold=2)
print(result2)

# Bind the value of threshold:
pipeline.bind(threshold=50)

result3 = pipeline(data=np.arange(100))
print(result3)
```

In this example, `data` and `threshold` are arguments that can be passed when running the pipeline, while `operation` is a parameter that can be adjusted using `set_params`.

### Branching Pipelines

GRPipe supports creating complex branching pipelines. Here's an example of a more advanced pipeline for data analysis:

```python
import polars as pl
from grpipe import Argument, step

# Define arguments
data_source = Argument("data_source")
analysis_type = Argument("analysis_type")

@step(args={"source": data_source})
def load_data(source):
    return pl.read_csv(source)

@step(args={"data": load_data})
def preprocess(data, fill_value=0):
    return data.drop_nulls().fill_nan(fill_value)

@step(args={"data": preprocess})
def calculate_statistics(data):
    return {
        "mean": data.mean().to_dict(),
        "median": data.median().to_dict(),
        "std_dev": data.std().to_dict()
    }

@step(args={"data": preprocess})
def perform_clustering(data, n_clusters = 3):
    # Implement your clustering logic here
    return {"clusters": [f"Cluster {i}" for i in range(n_clusters)]}

@step(args={"stats": calculate_statistics, "clusters": perform_clustering, "analysis": analysis_type})
def generate_report(stats, clusters, analysis):
    report = f"Analysis Type: {analysis}\n\n"
    report += "Statistics:\n"
    for key, value in stats.items():
        report += f"  {key}: {value}\n"
    report += f"\nNumber of clusters: {len(clusters['clusters'])}"
    return report

# Create the pipeline
analysis_pipeline = generate_report

# Run the pipeline
result = analysis_pipeline(
    data_source="path/to/your/data.csv",
    analysis_type="Exploratory Data Analysis"
)
print(result)
```

This example demonstrates how you can create a more complex pipeline that includes data loading, preprocessing, parallel analysis steps (statistics calculation and clustering), and report generation. It also shows how to use both arguments and parameters in a pipeline.

### Caching and Performance

GRPipe automatically caches the results of each step, improving performance for repeated runs with the same inputs. You can control caching behavior for each argument:

```python
frequently_changing_data = Argument("data", cachable=False)
stable_parameter = Argument("parameter", cachable=True)

@step(args={"data": frequently_changing_data, "param": stable_parameter})
def process_data(data, param):
    # Your processing logic here
    pass
```

### Verbose Logging

Enable verbose logging for debugging and monitoring:

```python
@step(verbose=True, args={"data": input_data})
def noisy_step(data):
    # This step will log detailed information about its execution
    pass
```

### Generating Flowcharts

GRPipe allows you to generate flowcharts of your pipeline using the `pipeline.draw()` method. This method generates a flowchart in mermaid markdown format, which can be easily visualized.

Here's an example of how to generate a flowchart for a simple pipeline:

```python
from grpipe import Argument, step, Pipeline

# Define arguments
data = Argument("data")
threshold = Argument("threshold")

@step(args={"input_data": data, "cutoff": threshold})
def process_data(input_data, cutoff, operation="mean"):
    if operation == "mean":
        result = input_data[input_data > cutoff].mean()
    elif operation == "std":
        result = input_data[input_data > cutoff].std()
    else:
        raise ValueError("Invalid operation")
    return result

@step(args={"processed": process_data})
def format_result(processed):
    return f"The result is: {processed:.2f}"

# Create the pipeline
pipeline = Pipeline(format_result)

# Generate the flowchart
flowchart = pipeline.draw(params=True)
print(flowchart)
```

This will generate a flowchart in mermaid markdown format, which can be visualized using mermaid tools. The `params` parameter allows you to include parameters in the flowchart.

## License

GRPipe is released under the MIT License. See the [LICENSE](LICENSE) file for details.
