Metadata-Version: 2.4
Name: deltacat-client
Version: 0.1.0
Summary: Lightweight REST client for the DeltaCAT API server.
Project-URL: Homepage, https://github.com/ray-project/deltacat
Author: Ray Team
License-Expression: Apache-2.0
Classifier: Development Status :: 4 - Beta
Classifier: Intended Audience :: Developers
Classifier: Operating System :: OS Independent
Classifier: Programming Language :: Python :: 3 :: Only
Classifier: Programming Language :: Python :: 3.10
Requires-Python: <3.12,>=3.11
Requires-Dist: attrs>=22.2.0
Requires-Dist: deltacat-io-core==0.1.0
Requires-Dist: httpx<0.29.0,>=0.23.0
Requires-Dist: python-dateutil>=2.8.0
Requires-Dist: requests>=2.31
Provides-Extra: all
Requires-Dist: deltacat-io-core[all]==0.1.0; extra == 'all'
Provides-Extra: io
Requires-Dist: deltacat-io-core[io]==0.1.0; extra == 'io'
Provides-Extra: lance
Requires-Dist: deltacat-io-core[lance]==0.1.0; extra == 'lance'
Provides-Extra: pandas
Requires-Dist: deltacat-io-core[pandas]==0.1.0; extra == 'pandas'
Provides-Extra: polars
Requires-Dist: deltacat-io-core[polars]==0.1.0; extra == 'polars'
Description-Content-Type: text/markdown

# DeltaCAT Client

`deltacat-client` is the Python REST client for [DeltaCAT](https://github.com/ray-project/deltacat).

It provides a lightweight way to interact with a DeltaCAT API server from
external applications, workers, agents, and orchestration systems without
installing the full DeltaCAT storage, compute, or server runtime stack.

## Overview

The client is organized around a small root `Client` object with
resource-oriented subclients:

- `client.jobs` — job submission, claiming, lifecycle
- `client.catalog` — namespace/table CRUD, read planning, data placement
- `client.subscriptions` — incremental data processing pipelines
- `client.publications` — crawler and data publishing workflows
- `client.transforms` — source-to-sink data transformations
- `client.pipelines` — multi-stage pipeline orchestration

This `Client` facade wraps the lower-level REST bindings generated from
DeltaCAT's OpenAPI schema.

## Installation

```bash
uv pip install deltacat-client
```

`deltacat-client` depends on the shared `deltacat-io-core` package for local
plan execution and local file materialization. You normally install the client
extras below rather than installing `deltacat-io-core` directly.

Package boundary:

- `deltacat-client` depends on `deltacat-io-core`
- `deltacat-client` and `deltacat-io-core` are intended to work without the
  thick `deltacat` package being installed
- use the thick `deltacat` package only when you explicitly want the native
  catalog/storage/compute runtime or want to execute thin plans through the
  thick public API

Naming convention:

- package/distribution name: `deltacat-io-core`
- Python import module: `deltacat_io_core`

Optional extras:

- `deltacat-client[io]` for local plan execution and local file-based read/write helpers
- `deltacat-client[pandas]` for Pandas dataframe input to `client.catalog.write(...)`
- `deltacat-client[polars]` for Polars dataframe input and AVRO materialization
- `deltacat-client[lance]` for local Lance dataset read/write support
- `deltacat-client[all]` for the full client-side read/write UX stack

High-level `client.catalog.write(...)` supports:

- PyArrow tables
- Pandas DataFrames
- Polars DataFrames
- Daft DataFrames
- NumPy arrays
- Ray Datasets
- local Parquet / CSV / TSV / PSV / Feather / JSON / ORC / AVRO files
- local Lance dataset directories

## Quick Start

### First-Run Setup

Before using the client, ensure a DeltaCAT API server is running.
For local development:

```bash
pip install deltacat
python -m deltacat.server --catalog-root /tmp/my-catalog --transport http --port 8080
```

For an existing production server, get the base URL from your team
(e.g., `https://deltacat-api.example.com`).

### Connect and Explore

```python
from deltacat_client import Client

# Local development
client = Client("http://localhost:8080")

# Production (with auth)
client = Client(
    "https://deltacat-api.example.com",
    catalog="robotics-prod",
    user_id="researcher@example.com",
    bearer_token="...",
)

# Discover what's available
catalogs = client.catalog.list_catalogs()
namespaces = client.catalog.list_namespaces()
tables = client.catalog.list_tables(namespace="robotics")

# Inspect a table
desc = client.catalog.describe_table(namespace="robotics", table="episodes")
print(f"Schema: {desc.schema}")
print(f"Rows: {desc.total_records}")
```

### Read Data

```python
# Direct read: the client plans and executes inline
pandas_df = client.catalog.read(
    namespace="robotics",
    table="episodes",
    read_as="pandas",
    filter_predicate={"eq": ["task", "pick_screwdriver"]},
    limit=5000,
    offset=0,
)

# Advanced: build a reusable read plan when you want to inspect or reuse it
plan = client.catalog.plan(namespace="robotics", table="episodes")
print(f"Files: {len(plan.all_files())}")
print(f"Partitions: {len(plan.partition_summaries())}")

# Execute the reusable plan directly through the client
reused_df = client.catalog.read(plan=plan, read_as="pandas")

# The thick deltacat package can also execute the same thin plan
import deltacat as dc
dataset = dc.read_table("episodes", namespace="robotics", plan=plan)
```

For schemaless tables, `client.catalog.read(...)` follows the same contract as
DeltaCAT itself and returns a flattened manifest table rather than the file
contents. The manifest rows include file paths and metadata such as content
type, sizes, and record counts. Materializing actual rows from a schemaless
table remains an explicit second step via
`client.catalog.from_manifest_table(...)`.

### Write Data

```python
import pyarrow as pa

# High-level write: the client manages staging and commit internally
result = client.catalog.write(
    pa.table({"episode_id": [1, 2], "score": [0.95, 0.87]}),
    namespace="robotics",
    table="predictions",
    mode="add",
    format="parquet",
)
```

### Transactions

Within a transaction context, supported catalog operations bind to the active
transaction automatically. You only need to pass `transaction=...` when you
want to override the ambient context explicitly.

```python
import pyarrow as pa

with client.transaction(commit_message="Create and backfill predictions") as tx:
    client.catalog.create_namespace(namespace="robotics_staging")

    client.catalog.create_table(
        namespace="robotics_staging",
        table="predictions",
        schema_def=[
            {"name": "episode_id", "type": "int64", "is_merge_key": True},
            {"name": "score", "type": "float64"},
        ],
        content_type="parquet",
        auto_create_namespace=False,
    )

    client.catalog.write(
        pa.table({"episode_id": [3, 4], "score": [0.91, 0.89]}),
        namespace="robotics_staging",
        table="predictions",
        mode="add",
        format="parquet",
    )

    preview = client.catalog.read(
        namespace="robotics_staging",
        table="predictions",
        read_as="pandas",
    )
```

The transaction context manager heartbeats while it is open, commits on normal
exit, and aborts on exception.

Use an explicit transaction handle when you want to control commit and abort
yourself:

```python
import pyarrow as pa

tx = client.transaction(commit_message="Backfill prediction scores")
try:
    client.catalog.write(
        pa.table({"episode_id": [3, 4], "score": [0.91, 0.89]}),
        namespace="robotics",
        table="predictions",
        mode="add",
        format="parquet",
        transaction=tx,
    )

    preview = client.catalog.read(
        namespace="robotics",
        table="predictions",
        read_as="pandas",
        transaction=tx,
    )

    tx.commit()
except Exception:
    tx.abort(reason="backfill failed")
    raise
```

Historic transactions are also supported for time-travel reads:

```python
checkpoint_ns = 1712697600000000000

with client.transaction(as_of=checkpoint_ns) as historic_tx:
    historic_df = client.catalog.read(
        namespace="robotics",
        table="predictions",
        read_as="pandas",
        transaction=historic_tx,
    )
```

An `as_of` transaction reads a historical snapshot and is read-only. Write
operations against a historic transaction are rejected.

Transaction-aware catalog APIs include:

- namespace operations:
  - `create_namespace(...)`
  - `get_namespace(...)`
  - `namespace_exists(...)`
  - `alter_namespace(...)`
  - `drop_namespace(..., purge=False)`
  - `list_namespaces(...)`
- table operations:
  - `create_table(...)`
  - `describe_table(...)`
  - `table_exists(...)`
  - `alter_table(...)`
  - `rename_table(...)`
  - `drop_table(..., purge=False)`
  - `list_tables(...)`
  - `list_table_versions(...)`
- data operations:
  - `plan(...)`
  - `read(...)`
  - `stage(...)`
  - `write(...)`
  - `commit(...)`

Transaction-bound mutation results include `transaction_id` and
`transaction_version` so callers can observe the authoritative server-side
version when needed.

Fetched transactions and write sessions remain bound to their originating
catalog and transaction metadata:

```python
tx = client.transaction(catalog="robotics-prod", commit_message="stage files")
session = client.catalog.stage(
    namespace="robotics",
    table="predictions",
    mode="merge",
    format="parquet",
    transaction=tx,
)

fetched_session = client.catalog.get_write_session(session.session_id)
fetched_tx = client.catalog.get_transaction(tx.id)

attached = fetched_session.commit(
    entries=[{"path": "/tmp/output.parquet", "record_count": 1024}]
)

latest_tx = fetched_tx.get()
```

Important transaction rules:

- within `with client.transaction(...):`, supported catalog operations bind to
  the ambient transaction automatically
- explicit `transaction=...` overrides the ambient transaction, but it must
  belong to the same `Client` instance
- `catalog=...` is not allowed when an active transaction context is already in
  scope
- fetched write sessions and fetched transactions keep their bound catalog; a
  conflicting explicit `catalog=...` is rejected
- some catalog APIs intentionally remain non-transactional and raise if called
  inside a transaction context, including `write_data(...)`,
  `truncate_table(...)`, `rollback_table(...)`, placement APIs, retention, and
  langolier / compaction status helpers

Typical next steps after discovery are:

- inspect a specific table with `client.catalog.describe_table(...)`
- read directly with `client.catalog.read(...)`
- build a read plan with `client.catalog.plan(...)` when you want to inspect,
  page, cache, or reuse it
- create or manage orchestration resources such as subscriptions and pipelines

When used as a context manager, `client.transaction(...)` automatically
heartbeats in the background while the transaction is open and idle.

Likewise, a standalone `WriteSession` returned by `client.catalog.stage(...)`
can be used as a context manager to auto-heartbeat while files are being
written, and it will abort the session on exception.

When you want an explicit reusable plan, `plan(...)` supports:
- `filter_predicate`: structured JSON AST pushed into DeltaCAT partition pruning,
  file-stat pruning, and Polars row filtering
- `max_files` / `file_offset`: page very large plans instead of returning every
  file path in one response
- `include_column_stats`: opt in only when the client actually needs per-file
  statistics

`client.catalog.read(...)` returns a PyArrow table by default and also accepts:

- `read_as="pandas"`
- `read_as="polars"`
- `read_as="numpy"`
- `read_as="lance"` for a lazy Lance dataset when the plan resolves to a single Lance dataset path
- `read_as="pyarrow_parquet"` for a lazy `pyarrow.parquet.ParquetFile` when the plan resolves to a single Parquet file
- `read_as="daft"`
- `read_as="ray_dataset"`

For schemaless tables, those supported `read_as` conversions apply to the
flattened manifest table, not to the underlying file contents.

Use `client.catalog.from_manifest_table(...)` when you explicitly want to
materialize the referenced files into a structured dataset.

## Supported Orchestration Path

The DeltaCAT client is the supported control plane for running managed work.

- `client.jobs.submit_compaction(...)`
- `client.jobs.submit_langolier(...)`
- `client.jobs.submit_retention(...)`
- `client.jobs.submit_gc_scan(...)`
- `client.jobs.submit_gc_full(...)`
- `client.jobs.submit_gc_outbox(...)`
- `client.jobs.submit_gc_archive(...)`
- `client.jobs.submit_gc_purge(...)`
- `client.jobs.submit_gc_restore(...)`
- `client.jobs.recover_pending_delete(...)`
- `client.jobs.audit_pending_delete(...)`
- `client.jobs.get_gc_stats(...)`
- `client.jobs.list_gc_exempt(...)`
- `client.jobs.submit_janitor(...)`
- `client.publications.create_crawler(...)` + `client.publications.run(...)`
- `client.transforms.create(...)` with `DiffSubscriber` + `client.transforms.run(...)` or `trigger(...)`
- `client.catalog.place(...)` + `client.catalog.replication_status(...)` + `client.catalog.unplace(...)`

## Managed Workflows

### Dispatch modes

Use [`DispatchMode`](./deltacat_client/enums.py) for `dispatch_mode`: `LOCAL` (run on the API host), `CUSTOM` (enqueue a job for external workers-the usual production choice), or `OSMO` (NVIDIA OSMO). For `CUSTOM`, workers pull work with `client.jobs.claim()`. See [Subscriptions, publications, transforms, and pipelines](#subscriptions-publications-transforms-and-pipelines) for the full lifecycle.

### Periodic triggers

Trigger configuration is part of each subscription, publication, and transform.

Create or update each subscription, publication, or transform with:
- `trigger={"mode": ...}`
- `trigger={"mode": "schedule", "schedule": {...}}` for scheduled resources

Schedule shapes are strict:
- `{"kind": "interval", "interval_seconds": N}` only accepts `interval_seconds`
- `{"kind": "cron", "cron": "...", "timezone": "Area/City"}` accepts `cron` plus optional `timezone`
- mixed interval/cron fields are rejected instead of being silently ignored

Use the trigger types as follows:
- `manual`: never auto-trigger; only `run(...)` / `trigger(...)` starts work
- `event`: auto-trigger on upstream table updates (subscriptions and transforms)
- `schedule`: auto-trigger on the server-managed periodic scheduler
- `custom`: publication-only; intended for external orchestrators

Defaults:
- subscriptions default to `event`
- transforms default to `event`
- root publications default to `manual`

The API server runs the built-in pipeline schedule manager. Scheduled subscriptions, publications, and transforms do not require scheduler enable flags on the server, and each resource controls its own schedule independently. Scheduled work uses the same durable job path as manual triggers, so `DispatchMode.CUSTOM` produces claimable jobs for external workers.

You can change trigger policy after creation:

```python
client.subscriptions.update(
    "events-sub",
    trigger={
        "mode": "schedule",
        "schedule": {"kind": "interval", "interval_seconds": 300},
    },
)

client.publications.update(
    "crawl-root",
    trigger={
        "mode": "schedule",
        "schedule": {
            "kind": "cron",
            "cron": "30 8,12 * * *",
            "timezone": "America/Los_Angeles",
        },
    },
)

client.transforms.update(
    "events-diff",
    trigger={"mode": "manual"},
)
```

Auth for trigger-policy changes follows the resource auth model:
- subscriptions: creator, managers, or ADMIN
- publications: creator, managers, or ADMIN
- transforms: creator, managers, or ADMIN

The [annotations example](#example-annotations-pipeline) below uses a **scheduled root crawl publication** plus **event** transforms so each crawl completion updates the catalog and downstream stages fire automatically.

### Crawl as a Publication

Use a crawler publication when the crawl should be a named, managed DeltaCAT workflow:

```python
from deltacat_client import DispatchMode

pub = client.publications.create_crawler(
    publication_id="crawl-openstack-swift-primary",
    name="OpenStack Swift PRIMARY Crawl",
    sink_namespace="crawled",
    sink_table="openstack_swift_primary",
    crawl_params={
        "seed_paths": [{"path": "s3://data-home"}],
        "filesystem_id": "openstack_swift_fs",
        "storage_backend": "openstack_swift",
        "crawl_strategy": "adaptive",
        "max_parallelism": 32,
    },
    dispatch_mode=DispatchMode.CUSTOM,
)

run = client.publications.run(pub.publication_id)
print(run.success, run.job_id if hasattr(run, "job_id") else None)
```

The declared publication sink is authoritative. The client/server force the
crawler to write to `sink_namespace.sink_table`, even if the embedded
`crawl_params` contains a different namespace or table name.

### Diff as a Transform

Use a first-class transform when the diff should be part of a managed CDC pipeline:

```python
import json

from deltacat_client import DispatchMode

xform = client.transforms.create(
    transform_id="openstack-swift-primary-diff",
    name="OpenStack Swift PRIMARY Diff",
    source_tables=[{"namespace": "crawled", "table": "openstack_swift_primary"}],
    sink_tables=[{"namespace": "changes", "table": "openstack_swift_primary_events"}],
    subscriber_class=(
        "deltacat.compute.pipelines.subscriber.diff_subscriber.DiffSubscriber"
    ),
    subscriber_mode="version",
    config_json=json.dumps(
        {
            "subscriber_kwargs": {
                "primary_key": "file_path",
                "version_key": "modified_at",
            }
        }
    ),
    dispatch_mode=DispatchMode.CUSTOM,
)

run = client.transforms.run(xform.transform_id)
print(run.success)
```

### Maintenance Jobs

Use `client.jobs` for maintenance and operational workflows:

```python
client.jobs.submit_gc_scan(
    seed_paths=["s3://bucket-a", "s3://bucket-b"],
    filesystem_id="amlfs01",
    max_depth=5,
)

client.jobs.submit_gc_full(
    pending_delete_root="s3://bucket/pending-delete",
    dry_run=True,
    source_types=["gc_scanner"],
)

client.jobs.submit_gc_outbox(
    outbox_root="s3://bucket/outbox",
    dry_run=True,
)

client.jobs.submit_gc_purge(
    pending_delete_root="s3://bucket/pending-delete",
    dry_run=True,
)

recoverable = client.jobs.recover_pending_delete(
    pending_delete_root="s3://bucket/pending-delete",
)

audit = client.jobs.audit_pending_delete(
    pending_delete_root="s3://bucket/pending-delete",
)

stats = client.jobs.get_gc_stats(filesystem_id="amlfs01")
exempt_dirs = client.jobs.list_gc_exempt()

client.jobs.submit_langolier(namespace="robotics", table="episodes")
client.jobs.submit_retention(namespace="robotics", table="episodes")
client.jobs.submit_janitor(dry_run=True)
```

## AI Agent / MCP Integration

DeltaCAT exposes an MCP (Model Context Protocol) server that enables AI agents
like Claude, Cursor, and other LLM-powered tools to interact with the catalog
through natural language. The MCP server exposes 46+ tools for catalog
management, data operations, and pipeline orchestration.

### Starting the MCP Server

```bash
# For Claude Code / Cursor:
claude mcp add deltacat -- python -m deltacat.server --catalog-root /path/to/catalog

# For HTTP-based MCP clients:
python -m deltacat.server --catalog-root /path/to/catalog --transport http --port 8080
```

### Key MCP Tools for Agents

**Discovery and inspection:**

| Tool | Purpose |
|------|---------|
| `deltacat_list_catalogs` | List available catalogs |
| `deltacat_list_namespaces` | List namespaces in a catalog |
| `deltacat_list_tables` | Discover tables in a namespace |
| `deltacat_describe_table` | Inspect schema, row count, size, properties |
| `deltacat_list_table_versions` | List version history for a table |
| `deltacat_preview_data` | Bounded server-side data sample |
| `deltacat_guide` | Get contextual help about DeltaCAT concepts |

**Catalog management:**

| Tool | Purpose |
|------|---------|
| `deltacat_create_namespace` | Create a new namespace |
| `deltacat_create_table` | Create a table with schema, properties, content type |
| `deltacat_alter_table` | Update table properties or description |
| `deltacat_rename_table` | Rename a table |
| `deltacat_drop_table` | Delete a table (with optional data purge) |
| `deltacat_drop_namespace` | Delete a namespace |
| `deltacat_rollback` | Restore a table to a prior point in time |
| `deltacat_truncate_table` | Remove all data from a table |

**Reading and writing data:**

| Tool | Purpose |
|------|---------|
| `deltacat_plan_read` | Build a read plan for client-side execution |
| `deltacat_write_data` | Small inline writes (< 10k rows) |
| `deltacat_stage_write` | Start a staged write session (large writes) |
| `deltacat_commit_write` | Commit staged files to the catalog |

**Placement and replication:**

| Tool | Purpose |
|------|---------|
| `deltacat_manage_placement` | Set, query, or remove data placement policies across roots |

**Pipelines and orchestration:**

| Tool | Purpose |
|------|---------|
| `deltacat_create_subscription` | Set up incremental data processing |
| `deltacat_create_transform` | Define a data transformation |
| `deltacat_create_publication` | Define sink tables |
| `deltacat_create_pipeline` | Group pipeline nodes into a named DAG |
| `deltacat_trigger_subscription` | Trigger async processing |
| `deltacat_run_subscription` | Run processing synchronously |

**Jobs and workers:**

| Tool | Purpose |
|------|---------|
| `deltacat_claim_job` | Claim a pending job (with worker tag affinity) |
| `deltacat_submit_preprocessing_job` | Submit a PackDS preprocessing job |

### Example: Building the Annotations Pipeline via MCP

An AI agent can build a complete annotations pipeline described in lab
design docs using MCP tools alone:

1. **Create the raw annotations table** using `deltacat_stage_write` +
   `deltacat_commit_write` (the agent writes schema-conformant Parquet files)

2. **Create a crawler subscription** to watch for new annotation files:
   ```
   deltacat_create_subscription(
       subscriber_id="annotations-crawler",
       source_tables=[{"namespace": "vision", "table": "crawler_root"}],
       subscriber_type="custom",
       subscriber_mode="version",
       dispatch_mode="custom",
       dispatch_config='{"required_worker_tags": ["cpu"]}'
   )
   ```

3. **Create a diff subscription** for change detection:
   ```
   deltacat_create_subscription(
       subscriber_id="annotations-diff",
       source_tables=[{"namespace": "vision", "table": "crawler_output"}],
       subscriber_type="diff",
       primary_key="episode_id",
       version_key="modified_timestamp_ns"
   )
   ```

4. **Create a transform** for curated output:
   ```
   deltacat_create_transform(
       transform_id="annotations-curate",
       name="Curate Annotations",
       source_tables=[{"namespace": "vision", "table": "annotations_diff"}],
       sink_tables=[{"namespace": "vision", "table": "annotations"}],
       select_columns=["episode_id", "key", "value", "start_time_ns", "end_time_ns"]
   )
   ```

5. **Wire into a pipeline** and trigger:
   ```
   deltacat_create_pipeline(
       pipeline_id="annotations-pipeline",
       name="Annotations Pipeline",
       node_ids=["annotations-curate"]
   )
   deltacat_trigger_subscription("annotations-crawler")
   ```

For the complete MCP tool reference and configuration guide, see
[deltacat/docs/mcp/README.md](https://github.com/ray-project/deltacat/blob/2.0/deltacat/docs/mcp/README.md).

## Common Workflows

### Catalog Management

Create, inspect, and manage namespaces and tables.

```python
from deltacat_client import Client

client = Client("http://localhost:8080")

# --- Namespaces ---

# Create a namespace
client.catalog.create_namespace(namespace="robotics")

# Check existence
if client.catalog.namespace_exists(namespace="robotics"):
    print("Namespace exists")

# Get namespace details
ns = client.catalog.get_namespace(namespace="robotics")
print(ns.properties)

# Update namespace properties
client.catalog.alter_namespace(
    namespace="robotics",
    properties={"team": "manipulation", "env": "prod"},
)

# --- Tables ---

# Create a table with properties
client.catalog.create_table(
    namespace="robotics",
    table="episodes",
    content_type="parquet",
    table_properties={"owner": "alice", "retention_days": "90"},
)

# Check existence
if client.catalog.table_exists(namespace="robotics", table="episodes"):
    print("Table exists")

# Inspect a table
desc = client.catalog.describe_table(namespace="robotics", table="episodes")
print(f"Schema: {desc.schema}")
print(f"Rows: {desc.total_records}")
print(f"Properties: {desc.properties}")

# Update table properties
client.catalog.alter_table(
    namespace="robotics",
    table="episodes",
    table_properties={"retention_days": "180"},
)

# List table version history
versions = client.catalog.list_table_versions(
    namespace="robotics", table="episodes",
)
for v in versions:
    print(f"Version: {v.version}")

# Rename a table
client.catalog.rename_table(
    namespace="robotics",
    table="episodes",
    new_name="episodes_v1",
)

# Rollback to a prior point in time (nanosecond timestamp)
client.catalog.rollback_table(
    namespace="robotics",
    table="episodes_v1",
    as_of=1711497600000000000,  # nanosecond epoch
)

# Truncate all data (keeps table metadata)
client.catalog.truncate_table(namespace="robotics", table="episodes_v1")

# Drop a table (purge=True also deletes data files)
client.catalog.drop_table(namespace="robotics", table="episodes_v1", purge=True)

# Drop a namespace
client.catalog.drop_namespace(namespace="robotics", purge=True)
```

### Data Operations

Direct reads, optional read plans, inline writes, retention, and compaction status.

```python
# Read directly through the client
dataset = client.catalog.read(
    namespace="robotics",
    table="episodes",
    read_as="pandas",
)

# Build a reusable plan when you want file-level metadata
plan = client.catalog.plan(namespace="robotics", table="episodes")
print(f"Files: {plan.total_files}, Records: {plan.total_records}")

# The full deltacat package can also execute the same thin plan
import deltacat as dc
thick_dataset = dc.read_table("episodes", namespace="robotics", plan=plan)

# Small inline write (< 10k rows, agent-generated data)
client.catalog.write_data(
    namespace="robotics",
    table="predictions",
    data=[{"episode_id": 1, "score": 0.95}],
)

# Check compaction status
status = client.catalog.compaction_status(
    namespace="robotics", table="episodes",
)

# Run retention cleanup (dry_run=True previews without deleting)
result = client.catalog.langolier_cleanup(
    namespace="robotics",
    table="episodes",
    dry_run=True,
)

# Get effective retention policy
policy = client.catalog.get_retention_policy(
    namespace="robotics", table="episodes",
)
```

### Auth

Inspect your identity and manage permissions.

```python
# Check your identity
me = client.auth.whoami()
print(f"User: {me.user_id}, Auth configured: {me.auth_configured}")

# Grant a role (admin only)
client.auth.grant(user_id="bob@example.com", role="WRITER")

# Grant scoped to a namespace
client.auth.grant(
    user_id="bob@example.com",
    role="READER",
    resource_type="namespace",
    resource_name="robotics",
)

# List grants
grants = client.auth.list_grants(user_id="bob@example.com")
for g in grants:
    print(f"{g.user_id}: {g.role} on {g.scope_type}/{g.scope_name}")

# The convenience facade returns only grant items. The raw REST API may also
# include informational metadata when auth is not configured.

# Revoke
client.auth.revoke(
    user_id="bob@example.com",
    resource_type="namespace",
    resource_name="robotics",
)
```

### Jobs

This workflow is intended for external workers. A worker claims a queued job,
keeps the job alive with heartbeats while processing, and reports the result
back to the DeltaCAT API server.

```python
from deltacat_client import Client

client = Client(
    "https://deltacat-api.example.com",
    worker_id="gpu-01",
    worker_tags=["gpu", "arm64"],
    bearer_token="worker-token-1",  # Required in Direct Auth mode
)

job = client.jobs.claim()
if job is not None:
    # The claim response includes a job_token (when DELTACAT_JOB_SECRET is set
    # on the server). The client passes it automatically on subsequent calls.
    print(f"Claimed job {job.job_id}, token: {job.job_token}")

    # Keep the job lease alive while processing.
    client.jobs.start_heartbeat(job.job_id, interval_seconds=30)

    # Read source files from the job context, perform work, then report success.
    # Source-consuming jobs must include the updated watermark on completion.
    client.jobs.complete(
        job.job_id,
        records_processed=50000,
        watermark={"partition_watermarks": {"slot-a": 123}, "known_partitions": ["slot-a"]},
    )
```

If you need a blocking poller instead of a background heartbeat loop, use
`client.jobs.wait(job_id)`.

For richer progress inspection, use `client.jobs.get_progress(job_id)`.
The response separates worker heartbeat progress from structured server-side
progress events:

```python
progress = client.jobs.get_progress(job.job_id)

print(progress.progress_fraction)   # 0.5
print(progress.progress_message)    # "halfway"
print(progress.progress_events)     # {"source_node_run": JobProgressEntry(...)}
```

#### How jobs are created

Workers do not create jobs themselves. A **job** is the universal work
primitive in DeltaCAT: a typed work unit with a claim/heartbeat/complete
lifecycle. Jobs enter the shared claim pool through two submission paths:

**Pipeline-scheduled jobs** (declarative, incremental):
Subscriptions, transforms, and publications define repeatable workflows
that watch for data changes and process them incrementally. When a
pipeline action (trigger, run, redrive) fires, it creates a job.
Crawler, diff, relay, and custom subscriber jobs all use this path.

**Ad-hoc submitted jobs** (imperative, one-shot):
`submit_preprocessing_job()` creates a one-shot job for a specific
snapshot with specific parameters. Episode index builds are auto-
submitted on PackDS table commits. Future system operations (compaction,
GC, langolier) will also use ad-hoc submission.

Both paths produce jobs in the **same claim pool**. Workers see all
pending jobs and claim based on worker tags and dispatch config. The
worker-side lifecycle is always the same: claim, heartbeat, process,
then complete or fail.

**Target architecture:** Jobs are the foundational primitive. Pipelines
are orchestration over jobs. Every work unit -- whether from a pipeline
trigger, an ad-hoc submit, a compaction dispatch, or a GC scan -- is a
job with full lifecycle tracking. Some execution paths bypass the job
store, including inline local dispatch and fire-and-forget queue messages.

#### Routing jobs to the right workers

For `DispatchMode.CUSTOM`, set `dispatch_config` when you
create the resource and pass matching tags when workers claim jobs. The
job system uses a shared claim pool, so these filters are the main way to
separate CPU-only work from GPU-heavy work.

```python
import json

from deltacat_client import DispatchMode

transform = client.transforms.create(
    transform_id="annotations-transform",
    name="Annotations Transform",
    source_tables=[{"namespace": "vision", "table": "annotations_raw"}],
    sink_tables=[{"namespace": "vision", "table": "annotations_curated"}],
    subscriber_class="example.transforms.AnnotationTransform",
    dispatch_mode=DispatchMode.CUSTOM,
    dispatch_config=json.dumps(
        {
            "required_worker_tags": ["gpu", "arm64"],
            "allowed_worker_ids": ["trainer-usw2-pod-17"],
        }
    ),
)

job = client.jobs.claim(worker_tags=["gpu", "arm64"])
```

Supported affinity keys today:

- `required_worker_tags`
- `allowed_worker_ids`

#### Job Store Durability

The server defaults to a durable job store (`DELTACAT_JOB_STORE_BACKEND=deltacat`)
that persists job state across process restarts and supports multi-instance HA
behind a load balancer. Jobs go through five states: PENDING, RUNNING,
FINALIZING, COMPLETED, FAILED. The FINALIZING state is a crash-recovery
checkpoint for server-side catalog mutations after worker completion.

Custom worker jobs use a shared claim pool with `required_worker_tags` and
`allowed_worker_ids` as routing filters. This is sufficient for the initial
deployment but is not a full multi-queue scheduler.

#### Choosing a `worker_id`

`worker_id` is a stable identifier for the worker process claiming jobs. It is
not pre-registered with the server and it does not reserve work ahead of time.
A good value is something stable for the lifetime of the worker, such as:

- a Kubernetes pod name
- a VM or instance identifier
- a hostname plus process role
- a generated UUID persisted for the life of the process

Example:

```python
client = Client(
    "https://deltacat-api.example.com",
    worker_id="trainer-usw2-pod-17",
)
```

#### Can multiple workers use the same `worker_id`?

They should not.

Jobs are not associated with any worker until they are claimed. After a worker
claims a job, the server records the `worker_id` and expects subsequent
heartbeats, completion, and failure reports to come from that same identifier.
Using the same `worker_id` from multiple concurrent workers makes ownership and
recovery behavior ambiguous.

#### Job Authentication

When auth is enabled on the server, claiming a job requires WRITER permission.
In the default Direct Auth mode, workers satisfy that with a bearer token. In
Trusted Proxy mode, the upstream proxy supplies the authenticated identity
header and the client does not need to send `bearer_token`. The claim response
includes a per-job HMAC token (`job_token`) that is required on all subsequent
heartbeat, complete, and fail calls. The client handles this automatically:
the token is stored on the `ClaimedJob` object and passed as an `X-Job-Token`
header by the transport.

Tokens are bound to the specific claim instance `(job_id, worker_id, claimed_at)`
and auto-invalidate if the job is reclaimed (e.g., after heartbeat expiry).

```python
client = Client(
    "https://deltacat-api.example.com",
    worker_id="gpu-01",
    bearer_token="worker-token-1",  # Authenticates the claim
)

job = client.jobs.claim()
# job.job_token is automatically passed on heartbeat/complete/fail
```

#### Current limitation: no generic query-job submit API

`deltacat-client` does not yet expose a first-class `submit_query_job()` or
`submit_dataframe_script()` API. Supported paths are:

- `plan(...)` for scalable reads that your code executes client-side
- `stage(...)` + `commit(...)` for scalable writes
- subscriptions/publications/transforms/pipelines for repeatable managed flows

If you need to run arbitrary remote Python or dataframe code on a managed
cluster, use a dedicated server-side job contract.

### Catalog Read Planning

`plan` is the scalable path for reading table data. It resolves metadata
server-side and returns a typed `Plan` describing the files, schema, and
partition layout needed to execute the read client-side.

```python
from deltacat_client import Client

client = Client("https://deltacat-api.example.com", catalog="robotics-prod")

plan = client.catalog.plan(
    namespace="robotics",
    table="episodes",
    filter_predicate={
        "and": [
            {"eq": ["task", "pick_screwdriver"]},
            {"gte": ["episode_length", 32]},
        ]
    },
    limit=10000,
    include_stats=False,
)
all_files = plan.all_files()
image_files = plan.files_with_extension(".jpg", ".png")
partitions = plan.partition_summaries()
```

The structured `filter_predicate` format is a JSON AST. Supported operators:
- `eq`, `neq`, `gt`, `gte`, `lt`, `lte`
- `in`
- `between`
- `and`, `or`, `not`
- `is_null`

Examples:

```python
{"eq": ["task", "pick"]}
{"in": ["embodiment", ["gr1", "franka"]]}
{"between": ["episode_length", 32, 128]}
{"and": [{"eq": ["task", "pick"]}, {"gte": ["success_rate", 0.9]}]}
```

Most client code should call `client.catalog.read(...)` directly and let the
client generate the plan inline:

```python
from deltacat_client import Client

client = Client("https://deltacat-api.example.com", catalog="robotics-prod")
dataset = client.catalog.read(
    namespace="robotics",
    table="episodes",
    read_as="pandas",
)
```

This path plans and executes through the shared `deltacat-io-core`
execution layer, including MOR, lazy Parquet/Lance reads, schema alignment,
sort-aware ordering, and distributed reader outputs where supported.

Use an explicit `ReadPlan` when you want to inspect or reuse the metadata:

```python
plan = client.catalog.plan(namespace="robotics", table="episodes")
dataset = client.catalog.read(plan=plan, read_as="pandas")
```

The thick `deltacat` package can also execute the same thin plan:

```python
import deltacat as dc

dataset = dc.read_table("episodes", namespace="robotics", plan=plan)
```

> [!NOTE]
> `deltacat-client` intentionally does not depend on the full `deltacat`
> runtime. `client.catalog.read(...)` does not require the thick
> `deltacat` package to be installed.

Typical uses for a `ReadPlan`:

- pass the plan into `client.catalog.read(plan=plan)` for direct execution
- keep planning separate from execution when metadata and data scans happen in
  different stages of your workflow
- pass the same plan into `dc.read_table(plan=plan)` when the full `deltacat`
  package is available and you want to stay on the thick public API
- iterate over file paths from `plan.all_files()`
- group work by logical partition with `plan.partition_summaries()`
- inspect modality or file type via `DataFile.content_type` and `DataFile.extension`
- hand the plan to a downstream reader that performs the actual data scan

`client.catalog.read(...)` executes through the shared thin-client execution
layer whether the plan was generated inline or passed explicitly. The plan
contract carries the metadata needed for direct execution. There is no separate
runtime bridge back into thick DeltaCAT for thin plan execution.

> [!IMPORTANT]
> The shared thin-client data execution/materialization path currently supports
> local paths and `s3://` paths only. Plans or staged writes that require
> `gs://` or `az://` on the shared IO path fail deterministically until
> dedicated backend support lands.

For very large tables:
- request plans in pages with `limit` and `offset`
- leave `include_stats=False` unless the client needs them
- keep the API server on the metadata path only and execute the read client-side

For PackDS/training tables at scale (1PB+):
- use `include_files=False` to skip file enumeration entirely — returns only
  `snapshot_id`, `packds_uri`, `episode_index`, `shard_manifest`, and
  `replication_status` (O(deltas), not O(files))
- use `preferred_root` to resolve file paths through a replicated root
  (see [Data Placement and Replication](#data-placement-and-replication))

### Staged Writes

Most applications should use `client.catalog.write(...)` and let the client
manage staging and commit automatically.

Use `stage(...)` when you need explicit control over where files are written,
when they are committed, or how prewritten files are registered. It returns a
typed `WriteSession` with the destination directory and write mode.

```python
from deltacat_client import Client

client = Client("https://deltacat-api.example.com", catalog="robotics-prod")

session = client.catalog.stage(
    namespace="robotics",
    table="embeddings",
    mode="merge",
    format="parquet",
)

entry = session.entry_for(
    "embeddings/output.parquet",
    records=1024,
)

result = client.catalog.commit(session=session, entries=[entry])
```

In practice, an application or worker using staged writes will:

1. call `stage(...)`
2. write one or more output files under `session.data_dir`
3. build `WriteEntry` values for those files
4. call `commit(...)`

`client.catalog.write(...)` is the higher-level convenience path. It accepts:

- PyArrow tables
- Pandas DataFrames
- Polars DataFrames
- local Parquet, Feather, ORC, or AVRO file paths or lists of those file paths
- local Lance dataset directories

It also manages session liveness automatically while local files are being
materialized, and temporarily heartbeats a bound transaction during the same
window when needed.

### Data Placement and Replication

DeltaCAT supports **multi-root catalogs** where data files are stored across
multiple storage backends (e.g., S3 in us-east-2, OpenStack Swift in PRIMARY, local
Lustre). The **placement API** lets you declare which roots should have copies
of a table's data and triggers replication automatically.

**Core concepts:**

- **Data Root**: A named storage backend in the catalog's `DataRootConfig`
  (e.g., `"aws_s3_iad"`, `"openstack_swift_primary"`). Each root has a URI prefix
  and optional connection configuration.
- **Placement Policy**: A table property that declares which data roots should
  have copies. Once set, new writes are automatically replicated by a
  background daemon. Stored on the table, applies to all table versions.
- **Backfill**: One-shot replication of existing data to a new root. Runs as a
  chunked, checkpointed orchestration job on dedicated planner/relay workers.
- **Replication Status**: Per-root progress tracking. Each delta (unit of data)
  is tracked individually in the Replica Ledger system table.
- **Preferred Root**: When reading via `plan`, you can request file paths
  resolved through a specific root. For training tables (PackDS), this uses
  a whole-snapshot gate: either ALL data + sidecars are available on the
  preferred root, or everything stays on the primary root.

```python
from deltacat_client import Client

client = Client("https://deltacat-api.example.com", catalog="robotics-prod")

# Set a placement policy: replicate to S3 IAD and backfill existing data
result = client.catalog.place(
    namespace="vision",
    table="annotations",
    roots=["aws_s3_iad"],
    backfill=True,       # replicate existing data (latest active version)
    set_policy=True,     # set ongoing policy for new writes
)
print(result)
# {"table": "annotations", "namespace": "vision", "roots": ["aws_s3_iad"],
#  "policy_set": true, "subscriptions_created": 1, "backfill_jobs": ["replica_backfill:..."]}
```

**Checking replication progress:**

```python
status = client.catalog.replication_status(
    namespace="vision",
    table="annotations",
)
print(status)
# {"table_ref": "vision.annotations",
#  "by_root": {"aws_s3_iad": {"complete": 95, "pending": 5, "failed": 0, "total": 100}},
#  "replicated_roots": [],
#  "policy": {"target_roots": ["aws_s3_iad"], "include_sidecars": true},
#  "backfill_status_by_root": {
#    "aws_s3_iad": {
#      "planner_job": {"job_id": "replica_backfill:...", "state": "completed"},
#      "transfer_shards": {"pending": 0, "running": 0, "finalizing": 0,
#                           "completed": 100, "failed": 0, "total": 100},
#      "checkpoint": {"last_partition_id": "...", "last_stream_position": 100},
#      "scope_total_deltas": 100,
#      "completed_deltas": 100,
#      "contiguous_completed_deltas": 100,
#      "blocked_on_gap": false
#    }
#  }}

# Scope to a specific table version:
status = client.catalog.replication_status(
    namespace="vision",
    table="annotations",
    table_version="42",
)
```

**Reading from the closest root:**

```python
# plan with preferred_root resolves file paths through the replicated root
plan = client.catalog.plan(
    namespace="vision",
    table="annotations",
    preferred_root="aws_s3_iad",
    include_files=False,  # PackDS training: skip file enumeration, O(deltas) only
)
# plan.packds_uri -> "s3://training-data-iad/.../v42.packds" (if fully replicated)
# plan.preferred_root_used -> true/false (whether the gate passed)
```

**Targeted backfill with selectors:**

```python
# Backfill a specific table version without setting ongoing policy:
client.catalog.place(
    namespace="vision",
    table="annotations",
    roots=["aws_s3_iad"],
    table_version="42",
    backfill=True,
    set_policy=False,  # one-shot replication only
)

# Backfill specific partitions:
client.catalog.place(
    namespace="vision",
    table="annotations",
    roots=["aws_s3_iad"],
    partition_filter=[
        {"partition_id": "uuid-abc-123"},
        {"partition_values": ["2026-03-01"]},
    ],
)
```

**Removing a root from the policy:**

```python
result = client.catalog.unplace(
    namespace="gear",
    table="annotations",
    root="aws_s3_iad",
)
# Does NOT delete replicated data — only stops future replication
```

For architecture details (Replica Ledger, ReplicaTriggerScanner daemon,
ReplicaSubscriber, backfill orchestration, worker pool routing), see the
[DeltaCAT Server README](../deltacat/docs/server/README.md) and
[CLAUDE.md](../CLAUDE.md) sections on Multi-Root Data Replication.

### Subscriptions, Publications, Transforms, and Pipelines

These resources define named, repeatable data workflows. They connect to the
job system: when a subscription or transform is configured with
`DispatchMode.CUSTOM`, triggering it creates a claimable job that external
workers pick up via `client.jobs.claim()`. (See [Managed workflows → Dispatch modes](#dispatch-modes) for a short overview.)

#### How the pieces fit together

```
Subscription (watches a source table for new data)
    → Transform (defines processing: custom code or column projection)
        → Publication (defines where output goes)
            → Pipeline (groups transforms into a DAG)
```

- **Subscription**: Tracks per-partition watermarks on a source table. When
  triggered, it detects new data since the last checkpoint. Configured with
  a `dispatch_mode`: `DispatchMode.LOCAL` (server runs it inline),
  `DispatchMode.CUSTOM` (creates a job for external workers), or
  `DispatchMode.OSMO` (OSMO compute pool).

- **Transform**: Binds a subscription (input) to a publication (output) with
  processing logic. Processing can be:
  - `select_columns`: Simple column projection (built-in, no custom code)
  - `subscriber_class`: Fully-qualified Python class implementing the
    `Subscriber` ABC (e.g., `"my_app.transforms.ConvertXDOF"`). The class
    receives the full DeltaCAT runtime context and can use any framework
    (Polars, Daft, PyArrow, Pandas, custom code).

- **Publication**: Declares where transform output lands (one or more sink
  tables) with provenance tracking.

- **Pipeline**: Groups transforms into a named DAG with topological execution
  order for coordinated redrive and status monitoring.

#### How pipelines create jobs

When using `DispatchMode.CUSTOM`:

1. You call `client.subscriptions.trigger(...)` or `client.subscriptions.run(...)`
2. The server detects new data and creates a `Job` in the durable job store
3. The job context includes source file paths, target staging paths, and credentials
4. An external worker calls `client.jobs.claim(...)` to receive the job
5. The worker processes data and calls `client.jobs.complete(...)`
   with the updated watermark for source-consuming jobs
6. The server commits the output, advances the watermark, and triggers
   downstream subscriptions automatically

#### Example: Annotations Pipeline

This example mirrors a real annotations pipeline design: raw
annotation files land in OpenStack Swift, a crawler indexes them, a diff subscriber
detects changes, and a custom worker converts them to the curated annotations
table format.

**Steady-state orchestration:** the root crawl publication uses `trigger={"mode": "schedule", ...}` and runs on the server-managed schedule manager (see [Periodic triggers](#periodic-triggers)). The crawl uses `CUSTOM` dispatch so **worker processes claim crawl jobs** from the job store. Downstream transforms omit `trigger` (default **`event`**): when the crawl commits new data to `annotations_crawled`, the server triggers the diff transform, then the convert transform.

```python
import json

from deltacat_client import Client, DispatchMode

client = Client(
    "https://deltacat-api.example.com",
    catalog="vision-prod",
    bearer_token="pipeline-admin-token",
)

# ── Stage 1: Crawl publication ──────────────────────────────────────
# Scheduled root crawl: the schedule manager dispatches jobs for each cron
# slot. CUSTOM dispatch → workers claim.

crawl_pub = client.publications.create_crawler(
    publication_id="annotations-crawl",
    name="Annotations Crawl",
    sink_namespace="vision",
    sink_table="annotations_crawled",
    crawl_params={"seed_paths": [{"path": "s3://annotation-bucket/annotations"}]},
    trigger={
        "mode": "schedule",
        "schedule": {
            "kind": "cron",
            "cron": "30 8,12 * * *",
            "timezone": "America/Los_Angeles",
        },
    },
    dispatch_mode=DispatchMode.CUSTOM,
    dispatch_config=json.dumps({"required_worker_tags": ["cpu"]}),
)

# ── Stage 2: Diff transform ─────────────────────────────────────────
# Compares consecutive crawler table versions to produce ADD/UPDATE/DELETE
# deltas. Diff jobs are memory-hungry, so pin them to dedicated workers.
# Default trigger is event-driven when annotations_crawled changes.

diff_transform = client.transforms.create(
    transform_id="annotations-diff",
    name="Annotations Diff",
    source_tables=[{"namespace": "vision", "table": "annotations_crawled"}],
    sink_tables=[{"namespace": "vision", "table": "annotations_diff"}],
    subscriber_class=(
        "deltacat.compute.pipelines.subscriber.diff_subscriber.DiffSubscriber"
    ),
    subscriber_mode="version",
    dispatch_mode=DispatchMode.CUSTOM,
    dispatch_config=json.dumps({
        "required_worker_tags": ["highmem"],
        "allowed_worker_ids": [
            "diff-worker-useast2-1",
            "diff-worker-useast2-2",
        ],
    }),
    config_json=json.dumps(
        {
            "subscriber_kwargs": {
                "primary_key": "episode_id",
                "version_key": "modified_timestamp_ns",
            }
        }
    ),
)

# ── Stage 3: Convert + Curate transform ─────────────────────────────
# Custom worker converts raw XDOF/LeRobot annotations to the canonical
# schema and writes curated output.

convert_transform = client.transforms.create(
    transform_id="annotations-convert-xform",
    name="XDOF/LeRobot to Canonical Annotations",
    source_tables=[{"namespace": "vision", "table": "annotations_diff"}],
    sink_tables=[{"namespace": "vision", "table": "annotations"}],
    subscriber_class="example.transforms.AnnotationConverter",
    dispatch_mode=DispatchMode.CUSTOM,
    dispatch_config=json.dumps({"required_worker_tags": ["cpu", "us-east-2"]}),
)

# ── Pipeline (groups all stages for coordinated management) ─────────

pipeline = client.pipelines.create(
    pipeline_id="annotations-pipeline",
    name="Annotations Pipeline",
    node_ids=["annotations-convert-xform"],
)

# No manual run()/trigger() in steady state: the publication schedule scanner
# dispatches crawl jobs; downstream transforms run on source-table updates.
# Optional one-shot bootstrap before the first scheduled bucket:
# client.publications.run(crawl_pub.publication_id)
```

**Worker side** (runs on a separate CPU fleet):

```python
from deltacat_client import Client

worker = Client(
    "https://deltacat-api.example.com",
    worker_id="cpu-worker-us-east-2-pod-7",
    worker_tags=["cpu", "us-east-2"],
    bearer_token="worker-token-1",
)

while True:
    job = worker.jobs.claim()
    if job is None:
        time.sleep(5)
        continue

    worker.jobs.start_heartbeat(job.job_id, interval_seconds=30)

    # The job context tells the worker what to read and where to write.
    # example.transforms.AnnotationConverter.process_deltas() handles the
    # actual conversion logic using any Python framework.
    for table_ref, files in job.data_access.items():
        for file_info in files.get("files", []):
            print(f"Processing: {file_info['path']}")

    worker.jobs.complete(
        job.job_id,
        records_processed=1000,
        watermark={"partition_watermarks": {"slot-a": 1}, "known_partitions": ["slot-a"]},
    )
```

#### Pipeline operations

```python
# Check pipeline health
status = client.pipelines.status("annotations-pipeline")
print(f"Active: {status.raw.get('active_count')}")

# Inspect a subscription's watermark (how far it has processed)
wm = client.subscriptions.get_watermark("annotations-convert")

# Pause a subscription (stops processing until resumed)
client.subscriptions.pause("annotations-convert")
client.subscriptions.resume("annotations-convert")

# Redrive: reset the watermark and reprocess from scratch
client.subscriptions.redrive(
    "annotations-convert",
    rewind_watermark="reset",
)

# Cascade redrive: redrive this subscription and all downstream ones
client.subscriptions.redrive(
    "annotations-crawler",
    cascade=True,
    dry_run=True,  # Preview the plan first
)
```

Creating a subscription, transform, publication, or pipeline only registers the
resource definition. Work is not executed until a control-plane action such as
`trigger`, `run`, or `redrive` is invoked.

### Training Data / PackDS Workflow

DeltaCAT manages training datasets stored in PackDS format -- an
episode-oriented training data layout backed by
[Lance](https://lancedb.github.io/lance/) columnar storage. A PackDS
dataset contains a `steps.lance` directory where each row is a training
step, grouped into episodes (e.g., robot manipulation trajectories).

The training workflow lets you register PackDS datasets, get training
metadata via `plan`, and submit preprocessing jobs for episode
filtering and shard assignment.

**Key terms:**

- **Episode index sidecar**: A Parquet file placed next to the PackDS
  data that materializes per-episode metadata (frame counts, task labels,
  annotation sources). Avoids scanning the full Lance table for every
  training prep step.
- **Snapshot ID**: A 32-hex content-addressed fingerprint of the exact
  data files in a resolved read plan. Used to key all training sidecars
  so that the same data always produces the same artifacts.
- **Shard manifest**: A Parquet file mapping `(episode, step)` pairs to
  GPU shard indices for deterministic distributed data loading.

#### Register a PackDS dataset

Register an existing PackDS dataset by creating a table with `packds_uri`
in its properties, then committing the pre-written Lance files.

For DeltaCAT to recognize the table as **PackDS-backed** for training
workflows, both conditions must be true:

1. the committed files have Lance content type and `.packds` in their URLs
2. the table metadata carries `packds_uri`

```python
from deltacat_client import Client

client = Client("http://localhost:8080")

# 1. Create namespace (if needed) and table with packds_uri property.
client.catalog.create_namespace(namespace="training")
client.catalog.create_table(
    namespace="training",
    table="episodes_v42",
    content_type="lance",
    table_properties={
        "packds_uri": "s3://training-data/training/episodes_v42.packds",
    },
)

# If the table already exists, set packds_uri via alter_table:
# client.catalog.alter_table(
#     namespace="training",
#     table="episodes_v42",
#     table_properties={"packds_uri": "s3://training-data/training/episodes_v42.packds"},
# )

# 2. Build manifest entries describing the pre-written .packds/steps.lance files.
entries = [
    {
        "path": "s3://training-data/training/episodes_v42.packds/steps.lance/_versions/1.manifest",
        "record_count": 0,
        "content_length": 5321,
    },
    {
        "path": "s3://training-data/training/episodes_v42.packds/steps.lance/data/0000.lance",
        "record_count": 125000,
        "content_length": 18350211,
    },
    {
        "path": "s3://training-data/training/episodes_v42.packds/steps.lance/_indices/uuid.idx",
        "record_count": 0,
        "content_length": 81244,
    },
]

# For the full manifest-entry construction logic, see:
# deltacat.utils.lance_utils.build_manifest_entries()

# Commit the pre-written Lance files.
result = client.catalog.commit(
    namespace="training",
    table="episodes_v42",
    entries=entries,
    format="lance",
    mode="merge",
)
```

After a write that DeltaCAT recognizes as PackDS-backed, DeltaCAT
automatically submits an `index_build` job to build the episode-index
sidecar next to the PackDS data. Recognition depends on the two rules
above: Lance-backed committed files plus `packds_uri`/`.packds` hints.

#### Get training metadata via plan

```python
plan = client.catalog.plan(
    namespace="training",
    table="episodes_v42",
)

print(plan.snapshot_id)        # "a1b2c3d4..." (32-hex content-addressed ID)
print(plan.packds_uri)         # "s3://bucket/data/v42.packds"
print(plan.episode_index)      # {"uri": "s3://.../__episode_index/.../index.parquet",
                                #  "status": "ready"}  # or "pending", "missing", "failed"
```

The `snapshot_id` is a deterministic fingerprint of the exact data files
backing this table version. Two calls to `plan` that resolve the same
files always return the same `snapshot_id`. The `episode_index.status`
tells you whether the sidecar Parquet file has been built:

- `ready` -- sidecar exists, ready for use
- `pending` -- build job is running
- `missing` -- no job submitted yet (sidecar will be built on next write or reconciliation)
- `failed` -- build job failed (check server logs)

#### Submit a preprocessing job

```python
response = client.jobs.submit_preprocessing_job(
    source_table={
        "namespace": "training",
        "table": "episodes_v42",
    },
    filter_predicate={"and": [
        {"eq": ["task", "pick_and_place"]},
        {"gte": ["total_frames", 50]},
    ]},
    shard_recipe={
        "shard_size": 64,
        "episode_sampling_rate": 1.0,
        "action_horizon": 8,
        "seed": 42,
    },
)

print(response.job_id)              # "preprocess:..."
print(response.snapshot_id)         # Frozen at submit time
print(response.packds_uri)          # "s3://bucket/data/v42.packds"
print(response.episode_index_exists)  # True/False
```

The shard recipe parameters control how episodes are distributed across
GPU shards: `shard_size` is the target number of training steps per shard,
`episode_sampling_rate` controls what fraction of episodes to include,
`action_horizon` determines the effective length of each episode (steps
that can start a complete action sequence), and `seed` ensures
reproducibility.

The job is submitted to the server's job queue. A preprocessing worker
claims it, builds/loads the episode index, applies the filter, computes
shard assignments, and writes a shard manifest.

Two preprocessing modes are supported:

- **Mode A**: omit `output_table` to compute only a shard manifest under
  the frozen source `snapshot_id`.
- **Mode B**: provide `output_table` to materialize a filtered PackDS
  snapshot as a DeltaCAT output, commit it with a deterministic
  `preprocessing_job_id`, write the shard manifest under the committed
  output `snapshot_id`, and enqueue the output episode-index build.

#### Retrieve the shard manifest

After the preprocessing job completes, get the shard manifest via
`plan` with a `recipe_hash`:

```python
plan = client.catalog.plan(
    namespace="training",
    table="episodes_v42",
    recipe_hash="abc123",
)

print(plan.shard_manifest)  # {"uri": "s3://.../__shard_manifests/.../abc123.parquet"}
```

#### MCP tools for training

When using DeltaCAT via MCP (e.g., from Claude Code), the relevant tools are:

- `deltacat_plan_read` -- returns `snapshot_id`, `packds_uri`,
  `episode_index` status, and optional `shard_manifest`.
  Use `include_files=false` for PackDS tables at scale (returns metadata
  only, O(deltas) not O(files)). Use `preferred_root` to resolve paths
  through a replicated root.
- `deltacat_submit_preprocessing_job` -- submits a preprocessing job
  with filter predicates and shard recipes
- `deltacat_manage_placement` -- set data placement policies to replicate
  training data across storage roots (e.g., OpenStack Swift to S3 for GPU access)

#### Further reading

For the full architectural reference -- including the episode index schema,
shard manifest algorithm, structured filter predicate format, the
relationship between the episode index sidecar and Lance native indexes,
the row-layout contiguity invariant, and reconciliation internals -- see
[deltacat/compute/training/README.md](../deltacat/compute/training/README.md).

## Configuration

The root `Client` can carry common deployment settings:

- `catalog`
- `headers`
- `user_id`
- `bearer_token`
- `verify_ssl`
- `follow_redirects`
- `timeout`

```python
import os

from deltacat_client import Client

client = Client(
    "https://deltacat-api.example.com",
    catalog="robotics-prod",
    bearer_token=os.environ["DELTACAT_TOKEN"],
    user_id="robotics-trainer@example.com",
    timeout=60,
    verify_ssl="/etc/ssl/certs/internal-ca.pem",
)
```

When `catalog` is configured on the root client, resource methods use it by
default unless an explicit `catalog=` override is provided.

Custom headers can be attached for tracing, correlation, or proxy deployments:

```python
client = Client(
    "https://deltacat-api.example.com",
    bearer_token="my-token",
    headers={
        "x-trace-id": "run-12345",
        "x-correlation-id": "pipeline-abc",
    },
)
```

> [!NOTE]
> Identity headers like `X-DeltaCAT-User` are only trusted when the server
> is configured with `DELTACAT_TRUST_IDENTITY_HEADERS=true` (Trusted Proxy
> mode). For direct auth, use `bearer_token` instead.

The client supports both synchronous and asynchronous lifecycle management:

```python
from deltacat_client import Client

with Client("https://deltacat-api.example.com") as client:
    tables = client.catalog.list_tables(namespace="robotics")
```

The context manager form is useful for long-lived services and scripts that
want explicit control over the underlying HTTP client lifecycle.

## Data Model Highlights

`Client` exposes typed models for the most common server responses, including:

- `ClaimedJob` -- claimed job with `job_id`, `job_token`, `data_access`, `sink_upload`, `credentials`
- `JobStatus` -- job lifecycle state, progress, error
- `ReadPlan` -- file manifest with schema, partitions, and content types
- `ScanTask` -- individual scan unit within a read plan
- `DataFile` -- file path, size, content type, record count
- `WriteSession` -- durable write session with staged data path
- `WriteEntry` -- explicit file metadata for commit
- `SubscriptionInfo`, `SubscriptionWatermark`, `WatermarkState` -- subscription lifecycle
- `PublicationInfo`, `TransformInfo`, `PipelineInfo` -- orchestration resources
- `PipelineActionResult` -- pipeline operation results

These models are designed to make file and manifest-oriented workflows easier
to work with in multimodal environments.

## Supported Read Paths

DeltaCAT distinguishes between:

- **Preview**: bounded server-side samples via `preview_data`
- **Plan**: lightweight metadata-only read planning via `plan`

For scalable reads, use `plan` and execute the resulting plan client-side.

## Package Architecture

This distribution contains:

- `deltacat_client`: the public Python facade
- `deltacat_generated_client`: bundled generated REST bindings used internally

The generated package is treated as an implementation detail. Consumers should
import from `deltacat_client`.

## Regenerating the Bundled Client

From the DeltaCAT repository root:

```bash
make export-openapi
make generate-deltacat-client
```

This exports the current REST OpenAPI schema and regenerates the bundled
low-level client bindings.

## Development Notes

The standalone `deltacat-client` package is tested, built, and release-hardened
independently from the main `deltacat` package:

- dedicated `make test-client` target
- generated-client drift check
- wheel build check
- fresh-environment install smoke test

See `README-development.md` in the repository root for the current development
targets and CI workflow.
