Metadata-Version: 2.4
Name: deltacat-client
Version: 0.1.1
Summary: Lightweight REST client for the DeltaCAT API server.
Project-URL: Homepage, https://github.com/ray-project/deltacat
Author: Ray Team
License-Expression: Apache-2.0
Classifier: Development Status :: 4 - Beta
Classifier: Intended Audience :: Developers
Classifier: Operating System :: OS Independent
Classifier: Programming Language :: Python :: 3 :: Only
Classifier: Programming Language :: Python :: 3.10
Requires-Python: <3.12,>=3.11
Requires-Dist: attrs>=22.2.0
Requires-Dist: deltacat-io-core==0.1.1
Requires-Dist: httpx<0.29.0,>=0.23.0
Requires-Dist: python-dateutil>=2.8.0
Requires-Dist: requests>=2.31
Provides-Extra: all
Requires-Dist: deltacat-io-core[all]==0.1.1; extra == 'all'
Provides-Extra: io
Requires-Dist: deltacat-io-core[io]==0.1.1; extra == 'io'
Provides-Extra: lance
Requires-Dist: deltacat-io-core[lance]==0.1.1; extra == 'lance'
Provides-Extra: pandas
Requires-Dist: deltacat-io-core[pandas]==0.1.1; extra == 'pandas'
Provides-Extra: polars
Requires-Dist: deltacat-io-core[polars]==0.1.1; extra == 'polars'
Description-Content-Type: text/markdown

<p align="center">
  <img src="https://github.com/ray-project/deltacat/raw/2.0/media/deltacat-logo-alpha-750.png" alt="deltacat logo" style="width:55%; height:auto; text-align: center;">
</p>

`deltacat-client` is the Python REST client for [DeltaCAT](https://github.com/ray-project/deltacat). It lets you read and write tables, manage jobs, and build data pipelines on a DeltaCAT API server -- without installing the full storage, compute, or server runtime stack.

The client talks to a DeltaCAT server over HTTP. Data files are read from and written to cloud storage (S3, GCS, Azure) directly by the client using server-vended credentials. Metadata operations (schema validation, transaction management, compaction triggers) stay server-side.


## Overview

The client is organized around a root `Client` object with resource-oriented subclients:

| Subclient | Purpose |
|-----------|---------|
| `client.catalog` | Namespaces, tables, read/write, transactions |
| `client.jobs` | Job submission, claiming, lifecycle, progress |
| `client.subscriptions` | Incremental data processing pipelines |
| `client.publications` | Data publishing workflows |
| `client.transforms` | Source-to-sink data transformations |
| `client.pipelines` | Multi-stage pipeline orchestration |


## Installation

```bash
pip install deltacat-client
```

Optional extras for local data materialization:

```bash
pip install "deltacat-client[all]"    # Full client-side read/write stack
pip install "deltacat-client[pandas]" # Pandas DataFrame support
pip install "deltacat-client[polars]" # Polars DataFrame support
pip install "deltacat-client[lance]"  # Lance dataset support
```


## Getting Started

Before using the client, you need a running DeltaCAT API server. See [Server Setup](docs/server-setup.md) for instructions.

### Quick Start

```python
from deltacat_client import Client
import pyarrow as pa

# Connect to a DeltaCAT server
client = Client("http://localhost:8080")

# Write data to a table (table is created automatically)
data = pa.table({
    "id": [1, 2, 3],
    "name": ["Cheshire", "Dinah", "Felix"],
    "age": [3, 7, 5],
})
client.catalog.write(data, namespace="default", table="cool_cats", mode="auto", format="parquet")

# Read the data back
df = client.catalog.read(namespace="default", table="cool_cats", read_as="pandas")
print(df)
```

### Core Concepts

DeltaCAT lets you manage **Tables** across one or more **Catalogs**. A **Table** is a named collection of data files. A **Catalog** is a named data lake that contains tables. For more on DeltaCAT's data model, see the [DeltaCAT README](https://github.com/ray-project/deltacat/tree/2.0#getting-started).

Expand the sections below to see examples of core client operations.

<details>

<summary><b>Authentication</b></summary>

When connecting to a production server with auth enabled, provide a bearer token:

```python
from deltacat_client import Client

client = Client(
    "https://deltacat-api.example.com",
    bearer_token="your-api-token",
)
```

The server validates your token and maps it to a user identity for permission checks (read, write, admin). The client automatically includes the token on every request. For data file access, the server vends short-lived STS credentials that the client uses to read from and write to cloud storage directly.

This is different from thick DeltaCAT (`import deltacat as dc`) where authentication is handled entirely by the filesystem layer (e.g., AWS IAM roles, boto3 profiles).

See [Configuration](docs/configuration.md) for bearer token setup, trusted identity headers, and role-based access control.

</details>

<details>

<summary><b>Reading Data</b></summary>

```python
# Read a full table as a Pandas DataFrame
df = client.catalog.read(namespace="robotics", table="episodes", read_as="pandas")

# Read as PyArrow, Polars, or NumPy
arrow_table = client.catalog.read(namespace="robotics", table="episodes", read_as="pyarrow")
polars_df = client.catalog.read(namespace="robotics", table="episodes", read_as="polars")

# Filter and limit
df = client.catalog.read(
    namespace="robotics",
    table="episodes",
    read_as="pandas",
    filter_predicate={"eq": ["task", "pick_screwdriver"]},
    limit=5000,
)

# Read a specific table version (time travel)
df = client.catalog.read(namespace="robotics", table="episodes", read_as="pandas", as_of=1712697600000000000)
```

</details>

<details>

<summary><b>Writing Data</b></summary>

The client supports writing PyArrow tables, Pandas DataFrames, Polars DataFrames, Daft DataFrames, NumPy arrays, Ray Datasets, and local files (Parquet, CSV, TSV, PSV, Feather, JSON, ORC, AVRO, Lance).

```python
import pyarrow as pa

# Write data (creates the table if it doesn't exist)
data = pa.table({"episode_id": [1, 2], "score": [0.95, 0.87]})
client.catalog.write(data, namespace="robotics", table="predictions", mode="auto", format="parquet")

# Append more data
data2 = pa.table({"episode_id": [3, 4], "score": [0.91, 0.89]})
client.catalog.write(data2, namespace="robotics", table="predictions", mode="auto", format="parquet")

# Write a Pandas DataFrame
import pandas as pd
df = pd.DataFrame({"episode_id": [5], "score": [0.93]})
client.catalog.write(df, namespace="robotics", table="predictions", mode="auto", format="parquet")
```

</details>

<details>

<summary><b>Transactions</b></summary>

Transactions provide atomic multi-step operations with automatic heartbeating and rollback on failure.

```python
import pyarrow as pa

with client.transaction(commit_message="Backfill predictions") as tx:
    client.catalog.write(
        pa.table({"episode_id": [10, 11], "score": [0.88, 0.92]}),
        namespace="robotics",
        table="predictions",
        mode="add",
        format="parquet",
    )

    # Reads within the transaction see uncommitted writes
    df = client.catalog.read(
        namespace="robotics",
        table="predictions",
        read_as="pandas",
    )
    print(f"Rows visible in transaction: {len(df)}")
# Transaction commits automatically on exit; aborts on exception
```

See [Transactions](docs/transactions.md) for time-travel reads, manual commit/abort, and transaction rules.

</details>

<details>

<summary><b>Jobs</b></summary>

DeltaCAT uses a durable job system for background work (compaction, data relay, subscription processing). The client can submit, monitor, and execute jobs.

```python
# List all jobs
jobs = client.jobs.list()
for job in jobs:
    print(f"{job.job_id}: {job.state}")

# Submit a compaction job
result = client.jobs.submit_compaction(namespace="default", table="predictions")
print(f"Submitted: {result.job_id}")

# Wait for it to complete
status = client.jobs.wait(result.job_id, timeout_seconds=120)
print(f"Final state: {status.state}")
```

Workers claim and execute jobs. Any process can act as a worker:

```python
# Claim the next pending job matching our worker tags
job = client.jobs.claim(worker_tags=["subscriber"])
if job:
    print(f"Claimed {job.job_id} (type: {job.context.get('job_type')})")

    # Report progress
    client.jobs.heartbeat(job.job_id, progress=0.5, message="Processing...")

    # Do the work...

    # Mark complete
    client.jobs.complete(job.job_id, records_processed=1000)
```

See [Jobs and Workers](docs/jobs-and-workers.md) for job types, worker routing, authentication, and dispatch modes.

</details>

<details>

<summary><b>Subscriptions</b></summary>

Subscriptions track changes to source tables and process new data incrementally. They maintain a watermark so each run picks up only new data.

```python
# Create a subscription that watches for new data in "raw_episodes"
client.subscriptions.create(
    subscriber_id="episode_processor",
    source_tables=[{"namespace": "robotics", "table": "raw_episodes"}],
    subscriber_type="custom",
    dispatch_mode="custom",
)

# Trigger processing (dispatches a job to a subscriber worker)
client.subscriptions.trigger("episode_processor")

# Check watermark state
wm = client.subscriptions.get_watermark("episode_processor")
print(f"Watermark: {wm.watermark}")

# Pause / resume / delete
client.subscriptions.pause("episode_processor")
client.subscriptions.resume("episode_processor")
client.subscriptions.delete("episode_processor")
```

See [Pipelines](docs/pipelines.md) for subscription modes (delta vs. version), triggers, and redrive.

</details>

<details>

<summary><b>Publications</b></summary>

Publications write processed data to sink tables. They can be triggered manually or wired into a pipeline after a subscription.

```python
# Create a publication that writes to a sink table
client.publications.create(
    publication_id="episode_publisher",
    name="Episode Publisher",
    sink_tables=[{"namespace": "robotics", "table": "clean_episodes"}],
    dispatch_mode="local",
)

# Run the publication
result = client.publications.run("episode_publisher")
print(f"Published: {result}")
```

See [Pipelines](docs/pipelines.md) for connecting publications to subscriptions.

</details>

<details>

<summary><b>Transforms</b></summary>

Transforms read from source tables, apply processing logic, and write to sink tables. They combine subscription (source tracking) with publication (sink writing) in a single node.

```python
# Create a transform: raw_episodes -> clean_episodes
client.transforms.create(
    transform_id="episode_cleaner",
    name="Episode Cleaner",
    source_tables=[{"namespace": "robotics", "table": "raw_episodes"}],
    sink_tables=[{"namespace": "robotics", "table": "clean_episodes"}],
    dispatch_mode="custom",
)

# Trigger transform processing
client.subscriptions.trigger("episode_cleaner")

# Pause / resume
client.transforms.pause("episode_cleaner")
client.transforms.resume("episode_cleaner")
```

See [Pipelines](docs/pipelines.md) for transform configuration, redrive, and rollback.

</details>

<details>

<summary><b>Pipelines</b></summary>

Pipelines wire subscriptions, transforms, and publications into a connected DAG. When an upstream node completes, downstream nodes are triggered automatically.

```python
# First, create the individual nodes
client.subscriptions.create(
    subscriber_id="ingest",
    source_tables=[{"namespace": "robotics", "table": "raw_data"}],
    subscriber_type="custom",
    dispatch_mode="custom",
)
client.transforms.create(
    transform_id="clean",
    name="Data Cleaner",
    source_tables=[{"namespace": "robotics", "table": "raw_data"}],
    sink_tables=[{"namespace": "robotics", "table": "clean_data"}],
    dispatch_mode="custom",
)
client.publications.create(
    publication_id="publish",
    name="Data Publisher",
    sink_tables=[{"namespace": "robotics", "table": "final_data"}],
    dispatch_mode="local",
)

# Wire them into a pipeline (nodes are connected in DAG order)
client.pipelines.create(
    pipeline_id="etl_pipeline",
    name="ETL Pipeline",
    node_ids=["ingest", "clean", "publish"],
)

# Check pipeline status
status = client.pipelines.status("etl_pipeline")

# Pause / resume all nodes at once
client.pipelines.pause("etl_pipeline")
client.pipelines.resume("etl_pipeline")
```

See [Pipelines](docs/pipelines.md) for DAG construction, redrive, and pipeline operations.

</details>

<details>

<summary><b>Data Placement and Replication</b></summary>

DeltaCAT catalogs can span multiple storage backends (S3, SwiftStack, Lustre). Data placement lets you replicate tables across roots so readers access data from the closest location.

```python
# Place a table on an additional storage root for replication
client.catalog.place(
    namespace="robotics",
    table="episodes",
    roots=["aws_s3_iad"],       # Replicate to this root
    backfill=True,               # Copy existing data too
)

# Check replication status
status = client.catalog.replication_status(namespace="robotics", table="episodes")
print(f"Roots: {status}")

# Read from the closest root (server resolves automatically)
df = client.catalog.read(
    namespace="robotics",
    table="episodes",
    read_as="pandas",
    root="aws_s3_iad",          # Prefer this root for file paths
)

# Remove a replication target
client.catalog.unplace(namespace="robotics", table="episodes", roots=["aws_s3_iad"])
```

New writes are automatically replicated to all placed roots via a background subscriber. Reads with `root=` get file paths rewritten through the preferred root when data is available there.

See [Configuration](docs/configuration.md) for data root setup and multi-root catalog configuration.

</details>

<details>

<summary><b>Scheduled Processing</b></summary>

Subscriptions and transforms can run on a schedule instead of being triggered manually. DeltaCAT supports interval-based and cron-based scheduling.

```python
# Process new data every 5 minutes
client.subscriptions.create(
    subscriber_id="metrics_ingester",
    source_tables=[{"namespace": "telemetry", "table": "raw_metrics"}],
    subscriber_type="custom",
    dispatch_mode="custom",
    trigger={"mode": "schedule", "schedule": {"interval_seconds": 300}},
)

# Process at 2am UTC daily using a cron expression
client.subscriptions.create(
    subscriber_id="nightly_aggregator",
    source_tables=[{"namespace": "telemetry", "table": "raw_metrics"}],
    subscriber_type="custom",
    dispatch_mode="custom",
    trigger={"mode": "schedule", "schedule": {"cron": "0 2 * * *", "timezone": "UTC"}},
)

# Event-driven: only runs when triggered manually or by an upstream pipeline node
client.subscriptions.create(
    subscriber_id="on_demand_processor",
    source_tables=[{"namespace": "telemetry", "table": "raw_metrics"}],
    subscriber_type="custom",
    dispatch_mode="custom",
    trigger={"mode": "event"},
)
```

The DeltaCAT server's built-in scheduler evaluates triggers periodically and dispatches jobs for subscriptions and transforms whose schedules are due. Paused nodes are skipped.

See [Configuration](docs/configuration.md) for trigger options and scheduling details.

</details>


## Server Setup

The DeltaCAT client connects to a DeltaCAT API server. For setup instructions, see:

- **[Server Setup Guide](docs/server-setup.md)** -- start a local or production server
- **[MCP Server for Claude Code](https://github.com/ray-project/deltacat/tree/2.0/deltacat/docs/mcp/README.md)** -- use DeltaCAT from Claude Code via MCP
- **[REST API Reference](https://github.com/ray-project/deltacat/tree/2.0/deltacat/docs/server/README.md)** -- full REST endpoint documentation


## Additional Resources

| Guide | Description |
|-------|-------------|
| [Reading and Writing](docs/reading-and-writing.md) | Read plans, write modes, staged writes, supported formats |
| [Transactions](docs/transactions.md) | Transaction lifecycle, time travel, rules and limitations |
| [Jobs and Workers](docs/jobs-and-workers.md) | Job types, claiming, heartbeat, worker routing, authentication |
| [Pipelines](docs/pipelines.md) | Subscriptions, publications, transforms, DAGs, redrive |
| [Configuration](docs/configuration.md) | Auth, dispatch modes, triggers, data placement |
| [Architecture](docs/architecture.md) | Package boundary, generated client, development notes |

For the core DeltaCAT data model, storage architecture, and catalog APIs, see the [DeltaCAT documentation](https://github.com/ray-project/deltacat/tree/2.0#overview).
