Metadata-Version: 2.4
Name: braided
Version: 0.0.1
Summary: A data pipeline library: YAML/JSON-configured DAGs of typed dict transformations.
Author: Koby Lewis
License: MIT
Project-URL: Homepage, https://github.com/Plyb/braided
Classifier: Programming Language :: Python :: 3
Classifier: Programming Language :: Python :: 3.12
Classifier: License :: OSI Approved :: MIT License
Classifier: Operating System :: OS Independent
Classifier: Typing :: Typed
Requires-Python: >=3.12
Description-Content-Type: text/markdown
Requires-Dist: typing_extensions>=4.13
Requires-Dist: jsonargparse>=4.30
Provides-Extra: jaxtyping
Requires-Dist: jaxtyping>=0.2; extra == "jaxtyping"
Provides-Extra: torch
Requires-Dist: torch>=2.0; extra == "torch"
Provides-Extra: datasets
Requires-Dist: datasets>=2.18; extra == "datasets"
Provides-Extra: dev
Requires-Dist: pytest>=8; extra == "dev"
Requires-Dist: pyright>=1.1.386; extra == "dev"
Requires-Dist: build>=1.2; extra == "dev"

# braided

A typed-dict pipeline library for Python. Define transformation DAGs in code or YAML/JSON, get automatic type checking, lazy evaluation, and optional caching.

## Contents

- [Install](#install)
- [Quickstart](#quickstart)
- [Strand kinds](#strand-kinds)
- [Custom inputs](#custom-inputs)
- [YAML / JSON config](#yaml--json-config)
- [Built-in strands](#built-in-strands)
- [Custom execution backends](#custom-execution-backends)

## Install

```bash
pip install braided
```

Optional extras:

| Extra | What it unlocks |
|---|---|
| `braided[datasets]` | HuggingFace Datasets map/cache backend (`braided.integrations.hf_datasets`) |
| `braided[torch]` | Tensor-safe pickle serialization in `Cache` |
| `braided[jaxtyping]` | Array dtype/shape awareness in the type checker |
| `braided[dev]` | pytest, pyright, and build tools |

## Quickstart

```python
from typing import Iterator, TypedDict

from braided import Node, NodeSpec, SequenceInput, execute_pipeline, strand

class Record(TypedDict):
    x: int

@strand
def double(item: Record) -> Record:
    return Record(x=item["x"] * 2)

@strand.one_to_many
def up_to(item: Record) -> Iterator[Record]:
    for i in range(item["x"]):
        yield Record(x=i)

nodes: NodeSpec[Record] = {
    "out": Node(function=up_to, args=["doubled"]),
    "doubled": Node(function=double, args=["seed"]),
}
result = execute_pipeline(nodes, {"seed": SequenceInput[Record]([Record(x=3)])})
print(list(result))
# [{"x": 0}, {"x": 1}, {"x": 2}, {"x": 3}, {"x": 4}, {"x": 5}]
```

## Strand kinds

| Decorator | Input → Output | Use for |
|---|---|---|
| `@strand` | `T → T'` | one-to-one row transforms |
| `@strand.one_to_many` | `T → Iterator[T']` | splitting or expanding rows |
| `@strand.many_to_many` | `Sequence[T] → Iterator[T']` | aggregations, joins, reordering |

A strand can take **multiple input sequences** by declaring multiple parameters. For `@strand` and `@strand.one_to_many`, inputs are aligned by position (zipped); for `@strand.many_to_many`, they are passed as separate sequences:

```python
class Pair(TypedDict):
    a: int
    b: int

@strand
def zip_add(left: Record, right: Record) -> Pair:
    return Pair(a=left["x"], b=right["x"])

nodes: NodeSpec[Pair] = {
    "out": Node(function=zip_add, args=["left", "right"]),
}
result = execute_pipeline(nodes, {
    "left": SequenceInput[Record]([Record(x=1), Record(x=2)]),
    "right": SequenceInput[Record]([Record(x=10), Record(x=20)]),
})
print(list(result))  # [{"a": 1, "b": 10}, {"a": 2, "b": 20}]
```

Class-based strands inherit from `Strand[T].OneToOne()`, `.OneToMany()`, or `.ManyToMany()`. They can take constructor parameters:

```python
from braided import Strand

class Scale(Strand[Record].OneToOne()):
    def __init__(self, factor: int) -> None:
        self.factor = factor

    def __call__(self, item: Record) -> Record:
        return Record(x=item["x"] * self.factor)
```

## Custom inputs

Pipelines receive data through `PipelineInput` subclasses. `SequenceInput` wraps an in-memory list. For other data sources — files, databases, streaming APIs — subclass `PipelineInput` directly:

```python
from collections.abc import Iterator, Sequence
from typing import overload

from braided import PipelineInput

class CSVInput(PipelineInput[Record]):
    def __init__(self, path: str) -> None:
        import csv
        with open(path) as f:
            self._rows = [Record(x=int(r["x"])) for r in csv.DictReader(f)]

    def __len__(self) -> int:
        return len(self._rows)

    def __iter__(self) -> Iterator[Record]:
        return iter(self._rows)

    @overload
    def __getitem__(self, index: int) -> Record: ...
    @overload
    def __getitem__(self, index: slice) -> Sequence[Record]: ...
    def __getitem__(self, index: int | slice) -> Record | Sequence[Record]:
        return self._rows[index]
```

Pass it like any other input:

```python
result = execute_pipeline(nodes, {"seed": CSVInput("data.csv")})
```

Custom inputs can also be instantiated from YAML config (see [YAML / JSON config](#yaml--json-config)) as long as their constructor arguments use concrete types that jsonargparse can resolve.

## YAML / JSON config

Pipelines can be defined in YAML or JSON and loaded at runtime. The `function` field accepts a dotted import path or a `class_path + init_args` dict for class-based strands.

```yaml
# pipeline.yaml
nodes:
  out:
    function:
      class_path: mypackage.Scale
      init_args:
        factor: 10
    args: [doubled]
  doubled:
    function: mypackage.double
    args: [seed]

inputs:
  seed:
    class_path: mypackage.CSVInput
    init_args:
      path: data.csv
```

```python
from braided import execute_pipeline_from_config

result = list(execute_pipeline_from_config("pipeline.yaml"))
```

## Built-in strands

### `Cache`

`Cache` is a pass-through strand that persists its input sequence to disk on the first run and reloads it on subsequent runs, skipping upstream computation:

```python
from braided import Cache, Node, NodeSpec, SequenceInput, execute_pipeline

nodes: NodeSpec[Record] = {
    "out": Node(function=Cache[Record]("/tmp/my_cache"), args=["source"]),
    "source": Node(function=double, args=["seed"]),
}
# First run: computes and saves to disk.
execute_pipeline(nodes, {"seed": SequenceInput[Record]([Record(x=1), Record(x=2)])})
# Second run: loads from disk; "source" is never evaluated.
result = list(execute_pipeline(nodes, {"seed": SequenceInput[Record]([])}))
```

### `join` / `Join`

Inner join on a shared key column:

```python
from braided import Join, Node, NodeSpec, SequenceInput, execute_pipeline

nodes: NodeSpec[dict] = {
    "out": Node(function=Join[dict]("id"), args=["left", "right"]),
}
result = execute_pipeline(nodes, {
    "left": SequenceInput[dict]([{"id": 1, "val": "a"}]),
    "right": SequenceInput[dict]([{"id": 1, "score": 42}]),
})
print(list(result))  # [{"id": 1, "val": "a", "score": 42}]
```

## Custom execution backends

Pass custom `map`, `flat_map`, or `many_to_many` callables to `execute_pipeline` to control how sequences are materialized — for example, using the HuggingFace Datasets backend:

```python
from braided.integrations.hf_datasets import hf_map_funcs

result = execute_pipeline(nodes, inputs, **hf_map_funcs())
```
