# GoldenPipe — Full API Reference

> Pipeline framework for data quality — compose Check, Flow, Match stages with pluggable routing and YAML config.
> See also: [llms.txt](llms.txt) for a concise overview.

## Install

```bash
pip install goldenpipe                      # Standalone (no tools included)
pip install goldenpipe[golden-suite]        # With GoldenCheck, GoldenFlow, GoldenMatch
```

## Quick Start

```python
import goldenpipe as gp

# Run a pipeline from a YAML config
result = gp.run("pipeline.yaml", "data.csv")
print(f"Status: {result.status}, Stages: {len(result.stages)}")

# Run on a DataFrame
result = gp.run_df(df, config="pipeline.yaml")

# Run specific stages only
result = gp.run_stages(["check", "flow"], "data.csv")
```

## High-Level API

```python
from goldenpipe import (
    run,          # run(config_path, data_path) -> PipeResult
    run_df,       # run_df(df, *, config=None) -> PipeResult
    run_stages,   # run_stages(stage_names, data_path) -> PipeResult
)
```

## Pipeline Class

```python
from goldenpipe import Pipeline

pipe = Pipeline(config)
pipe.add_stage(my_stage)
result = pipe.execute(data_path)
```

## Context and Result Models

```python
from goldenpipe import (
    PipeContext,    # PipeContext: df, findings, config, metadata (flows between stages)
    PipeResult,    # PipeResult: status, stages (list of StageResult), final_df
    StageResult,   # StageResult: stage_name, status, duration, findings, df
    Decision,      # Decision: action ("continue"|"skip"|"abort"), reason
    StageStatus,   # Enum: PENDING, RUNNING, COMPLETED, SKIPPED, FAILED
    PipeStatus,    # Enum: PENDING, RUNNING, COMPLETED, FAILED
)
```

## Stage System

```python
from goldenpipe import (
    StageInfo,   # StageInfo: name, description, input_type, output_type
    Stage,       # Stage base class
    stage,       # @stage(name, description) decorator for function-based stages
)
```

### Defining a custom stage

```python
from goldenpipe import stage, PipeContext

@stage(name="my_filter", description="Remove invalid rows")
def my_filter(ctx: PipeContext) -> PipeContext:
    ctx.df = ctx.df.filter(pl.col("status") != "invalid")
    return ctx
```

## Config

```python
from goldenpipe import (
    StageSpec,        # StageSpec: name, enabled, params
    PipelineConfig,   # PipelineConfig: stages (list of StageSpec), decisions
    load_config,      # load_config(path) -> PipelineConfig
)
```

## Built-in Decision Functions

```python
from goldenpipe import (
    severity_gate,   # severity_gate(ctx, *, max_errors=0) -> Decision
    pii_router,      # pii_router(ctx) -> Decision  (route based on PII detection)
    row_count_gate,  # row_count_gate(ctx, *, min_rows=1) -> Decision
)
```

## Common Usage Patterns

### Full pipeline: Check -> Flow -> Match
```python
import goldenpipe as gp

result = gp.run("pipeline.yaml", "customers.csv")

for stage_result in result.stages:
    print(f"  {stage_result.stage_name}: {stage_result.status} ({stage_result.duration:.1f}s)")

if result.status == gp.PipeStatus.COMPLETED:
    result.final_df.write_csv("golden_records.csv")
```

### Conditional routing with decision functions
```yaml
# pipeline.yaml
stages:
  - name: check
    enabled: true
  - name: flow
    enabled: true
    decision: severity_gate
    params:
      max_errors: 5
  - name: match
    enabled: true
    decision: row_count_gate
    params:
      min_rows: 10
```

### Custom stage pipeline
```python
from goldenpipe import Pipeline, stage, PipeContext, PipelineConfig

@stage(name="enrich", description="Add computed columns")
def enrich(ctx: PipeContext) -> PipeContext:
    ctx.df = ctx.df.with_columns(
        (pl.col("first_name") + " " + pl.col("last_name")).alias("full_name")
    )
    return ctx

config = PipelineConfig(stages=[
    StageSpec(name="check"),
    StageSpec(name="enrich"),
    StageSpec(name="match"),
])
pipe = Pipeline(config)
pipe.add_stage(enrich)
result = pipe.execute("data.csv")
```

### PII-aware routing
```python
from goldenpipe import run, pii_router

# Pipeline automatically routes to PPRL if PII is detected
result = run("pipeline.yaml", "sensitive_data.csv")
```

## Configuration Example (pipeline.yaml)

```yaml
stages:
  - name: check
    enabled: true

  - name: flow
    enabled: true
    decision: severity_gate
    params:
      max_errors: 0

  - name: match
    enabled: true
    params:
      fuzzy:
        name: 0.85
        address: 0.80
```

## Pipeline Flow

```
load_file -> GoldenCheck.scan_file(path) -> decide_flow(findings)
  -> if fixable: GoldenFlow.transform_df(df) -> updated df
  -> decide_match(findings, row_count, strategy_override)
  -> GoldenMatch.dedupe_df(df) or AgentSession.deduplicate(path)
  -> PipeResult
```

## CLI Commands

```bash
goldenpipe run pipeline.yaml data.csv         # Run full pipeline
goldenpipe run pipeline.yaml data.csv --dry   # Dry run (validate only)
goldenpipe validate pipeline.yaml             # Validate config
goldenpipe stages                             # List available stages
goldenpipe explain pipeline.yaml              # Explain pipeline flow
goldenpipe serve                              # Start REST API
goldenpipe mcp-serve                          # Start MCP server
goldenpipe agent-serve --port 8250            # Start A2A server
```

## Interfaces

- **MCP Server**: `goldenpipe mcp-serve` — 4 tools (run, validate, list stages, explain)
- **Remote MCP**: https://goldenpipe-mcp-production.up.railway.app/mcp/ (4 tools, Smithery: https://smithery.ai/servers/benzsevern/goldenpipe)
- **A2A Server**: `goldenpipe agent-serve --port 8250` — 4 skills via Agent-to-Agent protocol
- **REST API**: `goldenpipe serve` on port 8000
- **CLI**: 8 Typer commands
- **Python API**: `import goldenpipe` — 15 exports

## Links

- [GitHub](https://github.com/benzsevern/goldenpipe)
- [Concise overview](llms.txt)
