Metadata-Version: 2.4
Name: vcti-data-pipeline
Version: 2.0.0
Summary: ETL-style data processing framework with composable flow nodes — sources, transformers, reducers, and combiners
Author: Visual Collaboration Technologies Inc.
Requires-Python: <3.15,>=3.12
Description-Content-Type: text/markdown
License-File: LICENSE
Requires-Dist: numpy>=1.24
Requires-Dist: vcti-datanode>=2.0.0
Provides-Extra: test
Requires-Dist: pytest; extra == "test"
Requires-Dist: pytest-cov; extra == "test"
Provides-Extra: lint
Requires-Dist: ruff; extra == "lint"
Provides-Extra: typecheck
Requires-Dist: mypy; extra == "typecheck"
Dynamic: license-file

# Data Pipeline

ETL-style data processing framework with composable flow nodes — sources, transformers, reducers, and combiners.

## Installation

```bash
pip install vcti-data-pipeline>=2.0.0
```

### In `pyproject.toml` dependencies

```toml
dependencies = [
    "vcti-data-pipeline>=2.0.0",
]
```

---

## Quick Start

```python
import numpy as np
from vcti.pipeline import DataSource, DataTransformer, DataRecord, data_record

# 1. Define a source
class CsvSource(DataSource):
    def __init__(self, path: str, *, name: str | None = None) -> None:
        super().__init__(name=name)
        self.path = path

    def load(self) -> DataRecord:
        arr = np.genfromtxt(self.path, dtype=None, delimiter=",", names=True)
        return data_record(arr)

# 2. Define a transformer
class ScaleTransformer(DataTransformer):
    def __init__(self, factor: float, *, name: str | None = None) -> None:
        super().__init__(name=name)
        self.factor = factor

    def transform(self, record: DataRecord) -> DataRecord:
        scaled = record.load().copy()
        scaled["value"] *= self.factor
        return data_record(scaled, record.attributes)

# 3. Compose the flow (right-to-left)
source = CsvSource("data.csv", name="raw-csv")
scaler = ScaleTransformer(factor=2.0, name="2x-scaler")
flow = scaler.connect(source)

# 4. Execute
result = flow.execute()
```

### Combining multiple sources

```python
from vcti.pipeline import DataCombiner

ids = IdSource()
coords = CoordSource()

combiner = DataCombiner()
flow = combiner.connect(ids)
flow = flow.connect(coords)

combined = flow.execute()
# combined.load() has fields from both sources
```

### Iterating over cases

```python
from vcti.pipeline import ForEachFlow

flow = ForEachFlow(
    keys=CasesSource(reader),
    factory=lambda case_id: CaseDataSource(reader, case_id),
    key_field="ID",
)

for case_id, case_flow in flow:
    record = case_flow.execute()
    # process each case...
```

---

## Flow Node Types

| Type | Inputs | Purpose |
|------|--------|---------|
| `DataSource` | 0 (leaf) | Load data from external source |
| `DataTransformer` | 1 | Transform one record to another |
| `DataReducer` | 1+ | Combine multiple records into one |
| `DataCombiner` | 1+ | Merge arrays field-wise (extends DataReducer) |
| `DataTarget` | 1 | Persist record, pass through |
| `ForEachFlow` | — | Iterate over keyed cases |

### Other types

| Type | Purpose |
|------|---------|
| `DataRecord` | Alias for `DataNode` — the data payload flowing between nodes |
| `Column` | Frozen dataclass coupling column name with numpy dtype |
| `FlowNode` | Abstract base for all flow nodes |

---

## Dependencies

- [numpy](https://numpy.org/) (>=1.24)
- [vcti-datanode](https://pypi.org/project/vcti-datanode/) (>=2.0.0) — DataNode (aliased as DataRecord), plus EagerDataSource / LazyDataSource
