Metadata-Version: 2.4
Name: dataio-rs
Version: 0.4.7
Classifier: Programming Language :: Rust
Classifier: Programming Language :: Python :: 3
Classifier: Programming Language :: Python :: 3.11
Classifier: Programming Language :: Python :: 3.12
Requires-Dist: numpy>=2.0,<3 ; extra == 'bench'
Requires-Dist: pillow>=10,<12 ; extra == 'bench'
Requires-Dist: torch>=2.6,<2.10 ; extra == 'bench'
Requires-Dist: torchvision>=0.21,<0.25 ; extra == 'bench'
Requires-Dist: opencv-python-headless>=4.10,<5 ; extra == 'bench'
Requires-Dist: safetensors>=0.4.5,<0.7 ; extra == 'bench'
Requires-Dist: dataio-rs[bench] ; extra == 'bench-dali'
Requires-Dist: nvidia-dali-cuda120>=1.44,<1.52 ; extra == 'bench-dali'
Requires-Dist: dataio-rs[bench] ; extra == 'bench-spdl'
Requires-Dist: spdl>=0.0.8,<0.1 ; python_full_version >= '3.10' and extra == 'bench-spdl'
Requires-Dist: dataio-rs[bench] ; extra == 'bench-tensorflow'
Requires-Dist: tensorflow-cpu>=2.18,<2.21 ; extra == 'bench-tensorflow'
Provides-Extra: bench
Provides-Extra: bench-dali
Provides-Extra: bench-spdl
Provides-Extra: bench-tensorflow
Summary: High-performance data processing library for ML workloads.
Home-Page: https://github.com/Mikubill/dataio-rs
Requires-Python: >=3.11
Description-Content-Type: text/markdown; charset=UTF-8; variant=GFM
Project-URL: Package, https://pypi.org/project/dataio-rs/
Project-URL: Repository, https://github.com/Mikubill/dataio-rs

# dataio

[![CI](https://github.com/Mikubill/dataio-rs/actions/workflows/ci.yml/badge.svg)](https://github.com/Mikubill/dataio-rs/actions/workflows/ci.yml)
[![PyPI](https://img.shields.io/pypi/v/dataio-rs.svg)](https://pypi.org/project/dataio-rs/)
[![Python](https://img.shields.io/pypi/pyversions/dataio-rs.svg)](https://pypi.org/project/dataio-rs/)

A Rust-backed data loader for ML training. Declarative pipelines, native
decode + transform, zero-copy tensors via DLPack, and the same URI surface
for local files, S3/R2, GCS, Azure, and Hugging Face.

```python
import dataio

pipe = dataio.pipeline(
    dataio.slot("image")
        .decode_image(mode="rgb")
        .resize_short(256)
        .center_crop(h=224, w=224)
        .normalize("imagenet"),
    dataio.batch(64, prefetch=4),
)

with pipe.bind(records).run("torch", concurrency=64) as loader:
    for batch in loader:
        train_step(batch["image"])
```

## Install

```bash
pip install dataio-rs
```

Python 3.11 or newer. Wheels are CPU-safe by default; CUDA, nvJPEG,
and cuFile / GDS are discovered through `dlopen` at import time. Print
a capability report:

```bash
python -m dataio
```

## Pipeline

A pipeline is a static plan made of optional record maps, one or more
slot chains, exactly one shape stage (`batch` / `collate` / `group_by`),
and optional post-batch maps or native batch transforms.

```python
pipe = dataio.pipeline(
    dataio.map(lambda r: {"image": r["path"], "label": r["class_id"]}),

    dataio.slot("image")
        .decode_image(mode="rgb")
        .resize_short(256)
        .center_crop(h=224, w=224)
        .normalize("imagenet"),

    dataio.slot("label").decode_bytes(),
    dataio.batch(64, drop_last=True, prefetch=4),
)
```

Op factories are available as namespaces:

| Namespace | Purpose |
|---|---|
| `dataio.decoders` | image, audio, json, npy, safetensors, text, bytes, pt |
| `dataio.transforms` | resize, crop, normalize, augment, conditional ops |
| `dataio.batch_transforms` | mixup, cutmix, padding, shuffle, normalization |
| `dataio.cond` | predicates for slot-local conditional transforms and skips |

Slot methods like `.decode_image()`, `.resize()`, `.normalize()` are
shorthand for the same native ops.

### Slot-local conditionals

`dataio.cond` builds native predicates that compile into the plan
signature. Field predicates run before decode, so they can gate optional
slots; shape predicates run after decode and live inside transform
branches.

```python
from dataio import cond as c

pipe = dataio.pipeline(
    dataio.slot("image").decode_image(mode="rgb"),
    dataio.slot("mask", optional=True)
        .active_when(c.col("mask_id").is_not_null())
        .decode_image(mode="gray"),
    dataio.batch(32),
)

# Shape-driven branching after decode
dataio.slot("image") \
    .decode_image(mode="rgb") \
    .when(c.shape(axis=0).gt(512)) \
    .then(dataio.transforms.center_crop(h=512, w=512)) \
    .otherwise(dataio.transforms.resize(h=512, w=512))
```

### Grouped batches

When an upstream sampler decides batch boundaries, `group_by` emits one
batch per distinct key in arrival order. Pair it with
`dataio.from_sampler` to adapt a PyTorch-style `BatchSampler` (or any
iterable yielding lists of rows) and the optional `context=` callback to
surface per-record sidecar data on `batch.context`:

```python
pipe = dataio.pipeline(
    dataio.slot("latent").decode_safetensors(tensor_key=dataio.col("tensor_key")),
    dataio.group_by("batch_id", prefetch=4),
)

records = dataio.from_sampler(
    batch_sampler,
    lambda row: {
        "key": row["key"],
        "latent": row["cache_path"],
        "tensor_key": row["tensor_key"],
        "_row": row,
    },
)

with pipe.bind(records).run("torch", concurrency=128,
                            context=lambda r: r["_row"]) as loader:
    for batch in loader:
        latents = batch["latent"]
        rows = batch.context  # list[dict] aligned with batch.keys
```

`from_sampler` injects the `batch_id` field for you. The `record_fn`
argument is optional — omit it when the sampler already yields valid
record dicts.

## Runtime

`pipe.bind(records).run(...)` returns a context-managed loader. Worker
pools are process-wide and configured once via environment variables;
per-loader knobs control admission and prefetch:

| Knob | Meaning |
|---|---|
| `run(..., concurrency=N)` | per-loader record admission window |
| `dataio.batch(..., prefetch=K)` | number of batches in flight |
| `DATAIO_CPU_WORKERS` | size of the shared CPU decode/transform pool |
| `DATAIO_IO_WORKERS` | size of the shared async IO runtime |
| `DATAIO_OBJECT_SHARDS` | HTTP/object-store shard fanout |
| `DATAIO_S3_HTTP_VERSION` | `h1` or `h2` transport selection |

Default `concurrency="auto"` is enough for local data and moderate
batches; raise it for remote object reads or high-latency sources.
Output preserves submit order by default — pass `order="completion"` to
yield batches as soon as they finish.

### Error handling

```python
loader = pipe.bind(records).run("torch", on_failure="skip")
```

| `on_failure` | Behavior |
|---|---|
| `"strict"` | raise on the first row error |
| `"skip"` | drop failed rows, raise if a batch has no survivors |
| `0`, `1`, ... | require at least that many survivors |

Per-row errors land on `batch.errors`. For automatic per-error logging,
pass `log_errors=True` (routes through the `dataio.rows` logger) or a
callable for custom handling:

```python
loader = pipe.bind(records).run("torch", log_errors=True)
loader = pipe.bind(records).run("torch", log_errors=my_logger.warning)
```

### Distributed training

```python
loader = pipe.bind(records).run(
    "torch",
    shard={"id": rank, "of": world_size, "pad": True},
    seed=42,
    concurrency=64,
)
```

`pad=True` keeps every rank at the same number of batches; `seed`
drives shuffling and native batch-transform RNG. `state_dict()` /
`load_state_dict()` support checkpoint resume; mid-batch resume replays
the partial batch from its first sample so transform RNG remains
aligned.

## Direct IO

Flat verbs for scripts, evaluation, and one-shot reads:

```python
dataio.head("s3://bucket/file.bin")
chunk = dataio.read("s3://bucket/file.bin", offset=0, size=1 << 20)
etag  = dataio.write("s3://bucket/out.bin", chunk)

dataio.exists("s3://bucket/out.bin")
dataio.ls("s3://bucket/prefix/")
dataio.glob("s3://bucket/**/*.safetensors")
```

Composable handles for repeated operations on one source:

```python
src = dataio.reader("s3://bucket/shard.tar.gz")
payload = src.gunzip().tar_entry("0001.jpg").read_all()

with dataio.writer("s3://bucket/checkpoint.bin") as w:
    w.write_chunks(huge_generator())
```

Credentials and endpoints are configured once per scheme:

```python
dataio.configure_credentials("s3", method="default")  # platform chain
dataio.configure_credentials("r2", access_key_id="...", secret_access_key="...")
dataio.configure_endpoint("r2", "https://<account>.r2.cloudflarestorage.com")
```

`method="default"` resolves through env vars, shared profile, instance
metadata, or workload identity, whichever the platform exposes.

## Format helpers

```python
arr   = dataio.read_numpy("/data/x.npy")
image = dataio.read_image("s3://bucket/image.png", mode="rgb")
cols  = dataio.read_parquet("/data/shard.parquet", columns=["key", "height"])
rows  = (dataio.scan_parquet("/data/shard.parquet").filter(lambda r: r["height"] > 512).collect())

for example in dataio.iter_tfrecord("/data/train.tfrecord"):
    label = example["label"]
```

### safetensors

Zero-copy mmap reads for both single files and sharded checkpoints:

```python
dataio.write_safetensors(
    "/fsx/ckpt-step-12000/",
    state_dict,
    max_shard_size="5GB",
    metadata={"format": "pt"},
)

state_dict = dataio.read_safetensors("/fsx/ckpt-step-12000/", framework="torch")

with dataio.open_safetensors("/fsx/model.safetensors") as st:
    tensor = st.read(key="layer.weight", framework="torch")
```

On Linux hosts with CUDA and a GDS-ready filesystem, reads land directly
in device memory:

```python
state_dict = dataio.read_safetensors(
    "/fsx/ckpt-step-12000/",
    framework="torch",
    device="cuda:0",
)
```

`dataio.gds_available()` and `dataio.gds_info()` report the active state.

## API at a glance

| API | Purpose |
|---|---|
| `pipeline`, `slot`, `slots`, `chain`, `batch`, `collate`, `group_by` | pipeline DSL |
| `map`, `custom_op` | Python extension hooks |
| `DataLoader` | synchronous batch iterator |
| `read` / `write` / `head` / `stream` / `ls` / `glob` / `exists` / `delete` | URI bytes |
| `reader`, `writer` | composable IO handles |
| `from_callable`, `from_iterable`, `from_iterator`, `from_sampler`, `files`, `urls`, `manifest`, `webdataset`, `archive` | record sources |
| `read_safetensors`, `write_safetensors`, `open_safetensors` | safetensors |
| `read_numpy`, `read_image`, `read_parquet`, `scan_parquet`, `iter_tfrecord`, `read_hdf5`, `open_hdf5` | format helpers |
| `configure_credentials`, `configure_endpoint`, `configure_transport` | auth and transport |
| `gds_open`, `gds_register_buffer`, `gds_read_into_tensor` | low-level GDS |

## Documentation

- [`docs/arch.md`](docs/arch.md) — architecture, runtime IR, zero-copy
  contracts, DLPack ownership, security.
- [`docs/benchmark.md`](docs/benchmark.md) — benchmark commands,
  regression gates, full cross-framework matrix.
- [`examples/`](examples/) — runnable end-to-end scripts. Start with
  [`examples/multi_slot_pipeline.py`](examples/multi_slot_pipeline.py).

## Development

The Rust toolchain is pinned in `rust-toolchain.toml`. Editable build:

```bash
uv run --no-sync --with maturin maturin develop --profile staging
```

Tests:

```bash
cargo test
uv run --no-sync python -m pytest python/tests -q
```

Wheel:

```bash
uv run --no-sync --with maturin maturin build --profile staging --out dist
```

## Performance

![Cross-framework throughput on a DiT-style decode pipeline](docs/benchmarks/dit-multiframework-2026-05-28.png)

Decode pipeline (variable-size JPEG → per-record bucket →
symmetric CHW normalize), 32-core AMD EPYC 9R14, CPU only. Median
samples/s; higher is better.

| Workers | dataio | spdl | torchvision-io | torch-dataloader | ray-data |
|---:|---:|---:|---:|---:|---:|
|  1 |   **576** |   184 |   208 |   141 |   146 |
|  4 | **2,107** |   715 |   565 |   395 |   249 |
|  8 | **3,115** | 1,295 |   690 |   506 |   243 |
| 16 | **3,122** | 2,175 |   575 |   494 |   225 |

Full matrix, scaling charts, and reproduce commands in
[docs/benchmark.md](docs/benchmark.md).

## License

Apache-2.0. Source: <https://github.com/Mikubill/dataio-rs>. PyPI:
<https://pypi.org/project/dataio-rs/>.

