Metadata-Version: 2.4
Name: deltacat-client
Version: 0.1.5
Summary: Lightweight REST client and thin MCP scaffolding for the DeltaCAT API server.
Project-URL: Homepage, https://github.com/ray-project/deltacat
Author: Ray Team
License-Expression: Apache-2.0
Classifier: Development Status :: 4 - Beta
Classifier: Intended Audience :: Developers
Classifier: Operating System :: OS Independent
Classifier: Programming Language :: Python :: 3 :: Only
Classifier: Programming Language :: Python :: 3.11
Classifier: Programming Language :: Python :: 3.12
Requires-Python: <3.13,>=3.11
Requires-Dist: attrs>=22.2.0
Requires-Dist: deltacat-io-core~=0.1
Requires-Dist: httpx<0.29.0,>=0.23.0
Requires-Dist: python-dateutil>=2.8.0
Requires-Dist: requests>=2.31
Provides-Extra: all
Requires-Dist: deltacat-io-core[all]~=0.1; extra == 'all'
Provides-Extra: daft
Requires-Dist: deltacat-io-core[daft]~=0.1; extra == 'daft'
Provides-Extra: io
Requires-Dist: deltacat-io-core[io]~=0.1; extra == 'io'
Provides-Extra: lance
Requires-Dist: deltacat-io-core[lance]~=0.1; extra == 'lance'
Provides-Extra: mcp
Requires-Dist: mcp>=1.0; extra == 'mcp'
Provides-Extra: pandas
Requires-Dist: deltacat-io-core[pandas]~=0.1; extra == 'pandas'
Provides-Extra: polars
Requires-Dist: deltacat-io-core[polars]~=0.1; extra == 'polars'
Description-Content-Type: text/markdown

<p align="center">
  <img src="https://github.com/ray-project/deltacat/raw/2.0/media/deltacat-logo-alpha-750.png" alt="deltacat logo" style="width:55%; height:auto; text-align: center;">
</p>

`deltacat-client` is the primary Python client package for
[DeltaCAT](https://github.com/ray-project/deltacat). It lets you read and
write tables, run jobs, and build data pipelines against a DeltaCAT API
server without installing the full storage, compute, or server runtime
stack.

The client talks to a DeltaCAT server over HTTP. Metadata operations
(schema validation, transaction management, compaction) stay server-side,
while large data reads and writes go directly between the client and cloud
storage using short-lived credentials the server vends on demand.


## Overview

The client is organized around a root `Client` object with resource-oriented subclients:

| Subclient | Purpose |
|-----------|---------|
| `client.catalog` | Namespaces, tables, read/write, transactions |
| `client.jobs` | Job submission, claiming, lifecycle, progress |
| `client.publications` | Incremental producers into the DeltaCAT Lakehouse (pipeline root nodes) |
| `client.subscriptions` | Incremental consumers from the DeltaCAT Lakehouse (pipeline leaf nodes) |
| `client.transforms` | Incrementally transform data within the DeltaCAT Lakehouse (pipeline intermediate nodes) |
| `client.pipelines` | Wire publications, transforms, and subscriptions into a connected DAG |


## Installation

```bash
pip install deltacat-client
```

Optional extras for local data materialization:

```bash
pip install "deltacat-client[all]"    # Full client-side read/write stack
pip install "deltacat-client[pandas]" # Pandas DataFrame support
pip install "deltacat-client[polars]" # Polars DataFrame support
pip install "deltacat-client[daft]"   # Daft DataFrame support
pip install "deltacat-client[lance]"  # Lance dataset support
pip install "deltacat-client[mcp]"    # Typed async MCP HTTP client
```


## Getting Started

Before using the client, you need a running DeltaCAT API server. See
[Server Setup](docs/server-setup.md) for instructions.

DeltaCAT lets you manage **Tables** across one or more **Catalogs**. A
**Table** is a named collection of data files. A **Catalog** is a named
data lake that contains tables. For the full data model, see the
[DeltaCAT README](../README.md#getting-started).

### Quick Start

```python
from deltacat_client import Client
import pyarrow as pa

# Connect to a DeltaCAT server
client = Client("http://localhost:8080")

# Write data to a table (table is created automatically)
data = pa.table({
    "id": [1, 2, 3],
    "name": ["Cheshire", "Dinah", "Felix"],
    "age": [3, 7, 5],
})
client.catalog.write(data, table="cool_cats", mode="auto", format="parquet")

# Read the data back
df = client.catalog.read(table="cool_cats", read_as="pandas")
print(df)
```

### Core Concepts

Expand the sections below to see examples of core client operations.

<details>

<summary><b>Authentication</b></summary>

When connecting to a production server with auth enabled, provide a bearer token:

```python
from deltacat_client import Client

client = Client(
    "https://deltacat-api.example.com",
    bearer_token="your-api-token",
)
```

Admins can also onboard a new user, grant their initial access, and receive a
one-time token to share over a secure out-of-band channel:

```python
from deltacat_client import Client

admin = Client(
    "https://deltacat-api.example.com",
    bearer_token="admin-api-token",
)

created = admin.auth.create_user(
    user_id="newadmin@example.com",
    display_name="New Admin",
    email="newadmin@example.com",
    initial_role="ADMIN",
    resource_type="catalog",
    resource_name="*",
    issue_token_label="bootstrap",
    idempotency_key="create-newadmin-001",
)

bootstrap_token = created.token.token
print("Share this token once via a secure secret channel:", bootstrap_token)
```

The server validates your token and maps it to a user identity for permission checks (read, write, admin). The client automatically includes the token on every request. For data file access, the server vends short-lived STS credentials that the client uses to read from and write to cloud storage directly.

See [Configuration](docs/configuration.md) for the detailed auth model, bootstrap tokens, and role-based access control.

</details>

<details>

<summary><b>Reading Data</b></summary>

```python
# Read a table as a PyArrow table (the default format)
arrow_table = client.catalog.read(namespace="robotics", table="episodes")

# Read as Pandas, Polars, or Daft
df = client.catalog.read(namespace="robotics", table="episodes", read_as="pandas")
polars_result = client.catalog.read(namespace="robotics", table="episodes", read_as="polars")
daft_df = client.catalog.read(namespace="robotics", table="episodes", read_as="daft")

# Filter rows and limit results
df = client.catalog.read(
    namespace="robotics",
    table="episodes",
    read_as="pandas",
    filter_predicate={"eq": ["task", "pick_screwdriver"]},
    limit=5000,
)

# Time travel: read the table as it existed at a prior point in time.
# The as_of value is a nanosecond-precision Unix epoch timestamp.
df = client.catalog.read(
    namespace="robotics",
    table="episodes",
    read_as="pandas",
    as_of=1712697600000000000,
)
```

See [Reading and Writing](docs/reading-and-writing.md) for scalable reads,
lazy vs. eager materialization, and format-specific behavior.
For PackDS-backed training tables, see [Training Data](docs/training-data.md).

</details>

<details>

<summary><b>Writing Data</b></summary>

The client supports writing PyArrow tables, Pandas DataFrames, Polars DataFrames, Daft DataFrames, NumPy arrays, Ray Datasets, and local files (Parquet, CSV, TSV, PSV, Feather, JSON, ORC, AVRO, Lance).

```python
import pyarrow as pa

# Write data to a table.
# The table and namespace are created automatically when mode is "auto".
data = pa.table({"episode_id": [1, 2], "score": [0.95, 0.87]})
client.catalog.write(data, namespace="robotics", table="predictions", mode="auto", format="parquet")

# Append more data
data2 = pa.table({"episode_id": [3, 4], "score": [0.91, 0.89]})
client.catalog.write(data2, namespace="robotics", table="predictions", mode="auto", format="parquet")

# Write a Pandas DataFrame
import pandas as pd
df = pd.DataFrame({"episode_id": [5], "score": [0.93]})
client.catalog.write(df, namespace="robotics", table="predictions", mode="auto", format="parquet")

# For precise control over schema, partitioning, and sort order,
# create the table explicitly before writing.
client.catalog.create_table(
    namespace="robotics",
    table="scored_episodes",
    schema=pa.schema([
        pa.field("episode_id", pa.int64()),
        pa.field("score", pa.float64()),
        pa.field("episode_day", pa.string()),
    ]),
    partition_scheme={"keys": [{"key": ["episode_day"], "transform": "identity"}]},
    sort_scheme={"keys": [{"key": ["episode_id"], "sort_order": "descending"}]},
    auto_create_namespace=True,
)

data = pa.table({
    "episode_id": [1, 2],
    "score": [0.95, 0.87],
    "episode_day": ["2025-01-15", "2025-01-15"],
})
client.catalog.write(data, namespace="robotics", table="scored_episodes", mode="auto", format="parquet")

# Evolve a table's schema after creation
client.catalog.alter_table(
    namespace="robotics",
    table="predictions",
    schema_updates={
        "operations": [
            {"op": "add", "field": {"name": "confidence", "type": "float64"}}
        ]
    },
)
```

See [Reading and Writing](docs/reading-and-writing.md) for write modes,
staged writes, path-based writes with `create_if_missing`, and the full
list of supported formats and schema inputs.
For PackDS training data writes, see [Training Data](docs/training-data.md).

</details>

<details>

<summary><b>Transactions</b></summary>

Transactions provide atomic multi-step operations with automatic heartbeating and rollback on failure.

```python
import pyarrow as pa

with client.transaction(commit_message="Backfill predictions") as tx:
    client.catalog.write(
        pa.table({"episode_id": [10, 11], "score": [0.88, 0.92]}),
        namespace="robotics",
        table="predictions",
        mode="add",
        format="parquet",
    )

    # Reads within the transaction see uncommitted writes
    df = client.catalog.read(
        namespace="robotics",
        table="predictions",
        read_as="pandas",
    )
    print(f"Rows visible in transaction: {len(df)}")
# Transaction commits automatically on exit; aborts on exception
```

See [Transactions](docs/transactions.md) for time-travel reads, manual commit/abort, and transaction rules.

</details>

<details>

<summary><b>Jobs</b></summary>

DeltaCAT uses a durable job system for background work (compaction, data relay, subscription processing). The client can submit, monitor, and execute jobs.

```python
# List all jobs
jobs = client.jobs.list()
for job in jobs:
    print(f"{job.job_id}: {job.state}")

# Submit a compaction job
result = client.jobs.submit_compaction(table="predictions")
print(f"Submitted: {result.job_id}")

# Wait for it to complete
status = client.jobs.wait(result.job_id, timeout_seconds=120)
print(f"Final state: {status.state}")
```

Workers claim and execute jobs. Any process can act as a worker:

```python
# Claim the next pending job matching our worker tags
job = client.jobs.claim(worker_tags=["subscriber"])
if job:
    print(f"Claimed {job.job_id} (type: {job.context.get('job_type')})")

    # Report fine-grained progress and declare the deadline for the next chunk
    client.jobs.emit_event(
        job,
        event_name="batch_started",
        completed=1,
        expected=4,
        metadata={"batch": 1},
        heartbeat_timeout_seconds=300,
    )

    # Do the work...

    # Mark complete. Source-consuming jobs must include the advanced
    # watermark so the server can persist progress correctly.
    client.jobs.complete(
        job,
        records_processed=1000,
        watermark={
            "partition_watermarks": {"analytics.events": 42},
            "known_partitions": ["analytics.events"],
        },
    )
```

See [Jobs and Workers](docs/jobs-and-workers.md) for job types, worker
routing, heartbeat rules, retry semantics, and dispatch modes.

</details>

<details>

<summary><b>Publications</b></summary>

Publications are incremental producers that write new data into the
DeltaCAT Lakehouse. They sit at the root of a pipeline DAG and can be
triggered manually or fired by an upstream event.

```python
# Create a publication that writes to a sink table
client.publications.create(
    publication_id="episode_publisher",
    name="Episode Publisher",
    sink_tables=[{"namespace": "robotics", "table": "clean_episodes"}],
    dispatch_mode="local",
)

# Run the publication
result = client.publications.run("episode_publisher")
print(f"Published: {result}")
```

See [Pipelines](docs/pipelines.md) for publication configuration and DAG construction.

</details>

<details>

<summary><b>Subscriptions</b></summary>

Subscriptions are incremental consumers of DeltaCAT tables. They sit at
the leaves of a pipeline DAG, tracking a per-partition watermark so each
run picks up only new data.

```python
# Create a subscription that watches for new data in "raw_episodes"
client.subscriptions.create(
    subscriber_id="episode_processor",
    source_tables=[{"namespace": "robotics", "table": "raw_episodes"}],
    subscriber_type="custom",
    dispatch_mode="custom",
)

# Trigger processing (dispatches a job to a subscriber worker)
client.subscriptions.trigger("episode_processor")

# Check watermark state
wm = client.subscriptions.get_watermark("episode_processor")
print(f"Watermark: {wm.watermark}")

# Pause / resume / delete
client.subscriptions.pause("episode_processor")
client.subscriptions.resume("episode_processor")
client.subscriptions.delete("episode_processor")
```

See [Pipelines](docs/pipelines.md) for subscription modes (delta vs. version), triggers, and redrive.

</details>

<details>

<summary><b>Transforms</b></summary>

Transforms are the intermediate nodes of a pipeline DAG. Each transform
reads from one or more source tables, applies processing logic, and
writes to one or more sink tables.

```python
# Create a transform: raw_episodes -> clean_episodes
client.transforms.create(
    transform_id="episode_cleaner",
    name="Episode Cleaner",
    source_tables=[{"namespace": "robotics", "table": "raw_episodes"}],
    sink_tables=[{"namespace": "robotics", "table": "clean_episodes"}],
    dispatch_mode="custom",
)

# Trigger transform processing
client.subscriptions.trigger("episode_cleaner")

# Pause / resume
client.transforms.pause("episode_cleaner")
client.transforms.resume("episode_cleaner")
```

See [Pipelines](docs/pipelines.md) for transform configuration, redrive, and rollback.

</details>

<details>

<summary><b>Pipelines</b></summary>

Pipelines wire publications, transforms, and subscriptions into a
connected DAG. When an upstream node completes, downstream nodes are
triggered automatically.

```python
# First, create connected pipeline nodes
client.publications.create(
    publication_id="ingest_pub",
    name="Raw Ingest Publisher",
    sink_tables=[{"namespace": "robotics", "table": "raw_data"}],
    dispatch_mode="local",
)
client.transforms.create(
    transform_id="clean",
    name="Data Cleaner",
    source_tables=[{"namespace": "robotics", "table": "raw_data"}],
    sink_tables=[{"namespace": "robotics", "table": "clean_data"}],
    dispatch_mode="custom",
)
client.subscriptions.create(
    subscriber_id="consume_clean",
    source_tables=[{"namespace": "robotics", "table": "clean_data"}],
    subscriber_type="custom",
    dispatch_mode="custom",
)

# Preview the connected pipeline from an interior seed node
preview = client.pipelines.discover(seed_node_ids=["clean"])
print(preview.execution_order)

# Option A: persist exactly the previewed node_ids
client.pipelines.create(
    pipeline_id="etl_pipeline_pinned",
    name="ETL Pipeline (Pinned)",
    node_ids=preview.node_ids,
)

# Option B: create directly from seed node ids
client.pipelines.create(
    pipeline_id="etl_pipeline_seeded",
    name="ETL Pipeline (Seeded)",
    seed_node_ids=["clean"],
)

# Check pipeline status
status = client.pipelines.status("etl_pipeline_seeded")

# Pause / resume all nodes at once
client.pipelines.pause("etl_pipeline_seeded")
client.pipelines.resume("etl_pipeline_seeded")
```

See [Pipelines](docs/pipelines.md) for DAG construction, discovery
semantics, redrive, rollback, and stored-order validation.

</details>

<details>

<summary><b>Data Placement and Replication</b></summary>

DeltaCAT catalogs can span multiple storage backends (S3, SwiftStack, Lustre). Data placement lets you replicate tables across roots so readers access data from the closest location.

```python
# Place a table on an additional storage root for replication
client.catalog.place(
    namespace="robotics",
    table="episodes",
    roots=["aws_s3_iad"],       # Replicate to this root
    backfill=True,               # Copy existing data too
)

# Check replication status
status = client.catalog.replication_status(namespace="robotics", table="episodes")
print(f"Roots: {status}")

# Read from the closest root (server resolves automatically)
df = client.catalog.read(
    namespace="robotics",
    table="episodes",
    read_as="pandas",
    root="aws_s3_iad",          # Prefer this root for file paths
)

# Remove a replication target
client.catalog.unplace(namespace="robotics", table="episodes", root="aws_s3_iad")
```

New writes are automatically replicated to all placed roots via a background subscriber. Reads with `root=` get file paths rewritten through the preferred root when data is available there.

See [Configuration](docs/configuration.md) for data root setup and multi-root catalog configuration.

</details>

<details>

<summary><b>Scheduled Processing</b></summary>

Subscriptions and transforms can run on a schedule instead of being triggered manually. DeltaCAT supports interval-based and cron-based scheduling.

```python
# Process new data every 5 minutes
client.subscriptions.create(
    subscriber_id="metrics_ingester",
    source_tables=[{"namespace": "telemetry", "table": "raw_metrics"}],
    subscriber_type="custom",
    dispatch_mode="custom",
    trigger={"mode": "schedule", "schedule": {"interval_seconds": 300}},
)

# Process at 2am UTC daily using a cron expression
client.subscriptions.create(
    subscriber_id="nightly_aggregator",
    source_tables=[{"namespace": "telemetry", "table": "raw_metrics"}],
    subscriber_type="custom",
    dispatch_mode="custom",
    trigger={"mode": "schedule", "schedule": {"cron": "0 2 * * *", "timezone": "UTC"}},
)

# Event-driven: only runs when triggered manually or by an upstream pipeline node
client.subscriptions.create(
    subscriber_id="on_demand_processor",
    source_tables=[{"namespace": "telemetry", "table": "raw_metrics"}],
    subscriber_type="custom",
    dispatch_mode="custom",
    trigger={"mode": "event"},
)
```

See [Configuration](docs/configuration.md) for trigger options and scheduling details.

</details>


## Agentic Access (MCP)

DeltaCAT ships with a built-in [Model Context
Protocol](https://modelcontextprotocol.io) server so AI agents (for
example, Claude Code) can browse catalogs, inspect schemas, plan reads,
and stage writes through natural language instead of hand-written Python.

Most hand-written code should just use the REST `Client(...)` shown
above. Reach for MCP when you want to:

- let an agent explore and operate on your catalog conversationally
- embed DeltaCAT as a tool in an agentic application
- use a typed async Python wrapper over the MCP HTTP surface
  (`deltacat-client[mcp]`) from code that is already agentic in shape

See the [MCP Server guide](../deltacat/docs/mcp/README.md) for the full
tool reference, the typed async client, and recipes for agent-driven
catalog workflows.


## Server Setup

The DeltaCAT client connects to a DeltaCAT API server. For setup instructions, see:

- **[Server Setup Guide](docs/server-setup.md)** -- start a local or production server
- **[REST API Reference](../deltacat/docs/server/README.md)** -- full REST endpoint documentation
- **[MCP Server Guide](../deltacat/docs/mcp/README.md)** -- agentic access via MCP


## Additional Resources

| Guide | Description |
|-------|-------------|
| [Reading and Writing](docs/reading-and-writing.md) | Read plans, write modes, staged writes, supported formats |
| [Training Data](docs/training-data.md) | PackDS tables, episode indexes, shard manifests, distributed training |
| [Transactions](docs/transactions.md) | Transaction lifecycle, time travel, rules and limitations |
| [Jobs and Workers](docs/jobs-and-workers.md) | Job types, claiming, heartbeat, worker routing, authentication |
| [Pipelines](docs/pipelines.md) | Publications, transforms, subscriptions, DAGs, redrive |
| [Configuration](docs/configuration.md) | Auth, dispatch modes, triggers, data placement |
| [Maintainer Workflow](docs/maintainer-workflow.md) | Relationship between REST and MCP, generated bindings, facade updates, and validation guards |
| [Architecture](docs/architecture.md) | Package boundary, generated client, development notes |
| [Client Compatibility Runbook](../deltacat/docs/deploy/CLIENT_COMPATIBILITY_RUNBOOK_2026-04-12.md) | Promotion-time packaged client/server compatibility validation |

For the core DeltaCAT data model, storage architecture, and catalog APIs, see the [DeltaCAT documentation](../README.md#overview).
