Metadata-Version: 2.4
Name: deltacat-client
Version: 0.1.13
Summary: Lightweight REST client and thin MCP scaffolding for the DeltaCAT API server.
Project-URL: Homepage, https://github.com/ray-project/deltacat
Author: Ray Team
License-Expression: Apache-2.0
Classifier: Development Status :: 4 - Beta
Classifier: Intended Audience :: Developers
Classifier: Operating System :: OS Independent
Classifier: Programming Language :: Python :: 3 :: Only
Classifier: Programming Language :: Python :: 3.11
Classifier: Programming Language :: Python :: 3.12
Requires-Python: <3.13,>=3.11
Requires-Dist: attrs>=22.2.0
Requires-Dist: deltacat-io-core<0.2,>=0.1.13
Requires-Dist: httpx<0.29.0,>=0.23.0
Requires-Dist: python-dateutil>=2.8.0
Requires-Dist: requests>=2.31
Provides-Extra: all
Requires-Dist: deltacat-io-core[all]<0.2,>=0.1.13; extra == 'all'
Provides-Extra: daft
Requires-Dist: deltacat-io-core[daft]<0.2,>=0.1.13; extra == 'daft'
Provides-Extra: io
Requires-Dist: deltacat-io-core[io]<0.2,>=0.1.13; extra == 'io'
Provides-Extra: lance
Requires-Dist: deltacat-io-core[lance]<0.2,>=0.1.13; extra == 'lance'
Provides-Extra: mcp
Requires-Dist: mcp>=1.0; extra == 'mcp'
Provides-Extra: pandas
Requires-Dist: deltacat-io-core[pandas]<0.2,>=0.1.13; extra == 'pandas'
Provides-Extra: polars
Requires-Dist: deltacat-io-core[polars]<0.2,>=0.1.13; extra == 'polars'
Description-Content-Type: text/markdown

<p align="center">
  <img src="https://github.com/ray-project/deltacat/raw/2.0/media/deltacat-logo-alpha-750.png" alt="deltacat logo" style="width:55%; height:auto; text-align: center;">
</p>

`deltacat-client` is the primary Python client package for
[DeltaCAT](https://github.com/ray-project/deltacat). It lets you read and
write tables, run jobs, and build data pipelines against a DeltaCAT API
server without installing the full storage, compute, or server runtime
stack.

The client talks to a DeltaCAT server over HTTP. Metadata operations
(schema validation, transaction management, compaction) stay server-side,
while large data reads and writes go directly between the client and cloud
storage using short-lived credentials the server vends on demand.


## Overview

The client is organized around a root `Client` object with resource-oriented subclients:

| Subclient | Purpose |
|-----------|---------|
| `client.catalog` | Namespaces, tables, read/write, transactions |
| `client.jobs` | Job submission, claiming, lifecycle, progress |
| `client.publications` | Incremental producers into the DeltaCAT Lakehouse (pipeline root nodes) |
| `client.subscriptions` | Incremental consumers from the DeltaCAT Lakehouse (pipeline leaf nodes) |
| `client.transforms` | Incrementally transform data within the DeltaCAT Lakehouse (pipeline intermediate nodes) |
| `client.pipelines` | Wire publications, transforms, and subscriptions into a connected DAG |


## Installation

```bash
pip install deltacat-client
```

Optional extras for local data materialization:

```bash
pip install "deltacat-client[all]"    # Full client-side read/write stack
pip install "deltacat-client[pandas]" # Pandas DataFrame support
pip install "deltacat-client[polars]" # Polars DataFrame support
pip install "deltacat-client[daft]"   # Daft DataFrame support
pip install "deltacat-client[lance]"  # Lance dataset support
pip install "deltacat-client[mcp]"    # Typed async MCP HTTP client
```


## Getting Started

Before using the client, you need a running DeltaCAT API server. See
[Server Setup](docs/server-setup.md) for instructions.

DeltaCAT lets you manage **Tables** across one or more **Catalogs**. A
**Table** is a named collection of data files. A **Catalog** is a named
data lake that contains tables. For the full data model, see the
[DeltaCAT README](../README.md#getting-started).

### Quick Start

```python
from deltacat_client import Client
import pyarrow as pa

# Connect to a DeltaCAT server
client = Client("http://localhost:8080")

# Write data to a table. The table is created automatically — by default
# the client writes in ``mode="auto"`` with ``format="parquet"``.
data = pa.table({
    "id": [1, 2, 3],
    "name": ["Cheshire", "Dinah", "Felix"],
    "age": [3, 7, 5],
})
client.catalog.write(data, table="cool_cats")

# Read the data back
df = client.catalog.read(table="cool_cats", read_as="pandas")
print(df)
```

### Core Concepts

Expand the sections below to see examples of core client operations.

<details>

<summary><b>Authentication</b></summary>

When connecting to a production server with auth enabled, provide a bearer token:

```python
from deltacat_client import Client

client = Client(
    "https://deltacat-api.example.com",
    bearer_token="your-api-token",
)
```

Admins can also onboard a new user, grant their initial access, and receive a
one-time token to share over a secure out-of-band channel:

```python
from deltacat_client import Client

admin = Client(
    "https://deltacat-api.example.com",
    bearer_token="admin-api-token",
)

created = admin.auth.create_user(
    user_id="newadmin@example.com",
    display_name="New Admin",
    email="newadmin@example.com",
    initial_role="ADMIN",
    resource_type="catalog",
    resource_name="*",
    issue_token_label="bootstrap",
    idempotency_key="create-newadmin-001",
)

bootstrap_token = created.token.token
print("Share this token once via a secure secret channel:", bootstrap_token)
```

The server validates your token and maps it to a user identity for permission checks (read, write, admin). The client automatically includes the token on every request. For data file access, the server vends short-lived STS credentials that the client uses to read from and write to cloud storage directly.

See [Configuration](docs/configuration.md) for the detailed auth model, bootstrap tokens, and role-based access control.

</details>

<details>

<summary><b>Reading Data</b></summary>

```python
# Read a table as a PyArrow table (the default format)
arrow_table = client.catalog.read(namespace="robotics", table="episodes")

# Read as Pandas, Polars, or Daft
df = client.catalog.read(namespace="robotics", table="episodes", read_as="pandas")
polars_result = client.catalog.read(namespace="robotics", table="episodes", read_as="polars")
daft_df = client.catalog.read(namespace="robotics", table="episodes", read_as="daft")

# Filter rows and limit results
df = client.catalog.read(
    namespace="robotics",
    table="episodes",
    read_as="pandas",
    filter_predicate={"eq": ["task", "pick_screwdriver"]},
    limit=5000,
)

# Time travel: read the table as it existed at a prior point in time.
# The as_of value is a nanosecond-precision Unix epoch timestamp.
df = client.catalog.read(
    namespace="robotics",
    table="episodes",
    read_as="pandas",
    as_of=1712697600000000000,
)
```

See [Reading and Writing](docs/reading-and-writing.md) for scalable reads,
lazy vs. eager materialization, and format-specific behavior.
For PackDS-backed training tables, see [Training Data](docs/training-data.md).

</details>

<details>

<summary><b>Writing Data</b></summary>

The client supports writing PyArrow tables, Pandas DataFrames, Polars DataFrames, Daft DataFrames, NumPy arrays, Ray Datasets, and local files (Parquet, CSV, TSV, PSV, Feather, JSON, ORC, AVRO, Lance).

```python
import pyarrow as pa

# Write data to a table. The defaults are mode="auto" (creates the
# table on first write, appends thereafter) and format="parquet".
data = pa.table({"episode_id": [1, 2], "score": [0.95, 0.87]})
client.catalog.write(data, namespace="robotics", table="predictions")

# Append more data — same call, same defaults.
data2 = pa.table({"episode_id": [3, 4], "score": [0.91, 0.89]})
client.catalog.write(data2, namespace="robotics", table="predictions")

# Write a Pandas DataFrame
import pandas as pd
df = pd.DataFrame({"episode_id": [5], "score": [0.93]})
client.catalog.write(df, namespace="robotics", table="predictions")

# Override the defaults when you need a specific mode or format. Here we
# create a Lance-formatted table with explicit schema, partitioning, and
# sort order.
client.catalog.create_table(
    namespace="robotics",
    table="scored_episodes",
    schema=pa.schema([
        pa.field("episode_id", pa.int64()),
        pa.field("score", pa.float64()),
        pa.field("episode_day", pa.string()),
    ]),
    partition_scheme={"keys": [{"key": ["episode_day"], "transform": "identity"}]},
    sort_scheme={"keys": [{"key": ["episode_id"], "sort_order": "descending"}]},
    auto_create_namespace=True,
)

data = pa.table({
    "episode_id": [1, 2],
    "score": [0.95, 0.87],
    "episode_day": ["2025-01-15", "2025-01-15"],
})
client.catalog.write(data, namespace="robotics", table="scored_episodes")

# Evolve a table's schema after creation
client.catalog.alter_table(
    namespace="robotics",
    table="predictions",
    schema_updates={
        "operations": [
            {"op": "add", "field": {"name": "confidence", "type": "float64"}}
        ]
    },
)
```

See [Reading and Writing](docs/reading-and-writing.md) for write modes,
staged writes, path-based writes with `create_if_missing`, and the full
list of supported formats and schema inputs.
For PackDS training data writes, see [Training Data](docs/training-data.md).

</details>

<details>

<summary><b>Live Feature Enrichment</b></summary>

Declare one or more merge keys on the table schema and subsequent writes
update existing records in place.

```python
import pyarrow as pa
import deltacat as dc
from deltacat_client import Client

client = Client("http://localhost:8080")

# Fields tagged with is_merge_key=True identify the key columns used to
# reconcile updates.
schema = dc.Schema.of([
    dc.Field.of(pa.field("user_id", pa.int64()), is_merge_key=True),
    dc.Field.of(pa.field("name", pa.string())),
    dc.Field.of(pa.field("age", pa.int32())),
    dc.Field.of(pa.field("job", pa.string())),
])

# First write creates a Lance table with the merge-key schema.
initial = pa.table({
    "user_id": pa.array([1, 2, 3], type=pa.int64()),
    "name": ["Jim", "Dinah", "Bob"],
    "age":  pa.array([30, 28, 45], type=pa.int32()),
    "job":  ["Teacher", "Painter", "Sailor"],
})
client.catalog.write(
    initial,
    namespace="demo",
    table="users",
    format="lance",
    create_if_missing={"schema": schema, "auto_create_namespace": True},
)

# Upsert: user_id 1 + 3 are updated; 4 and 5 are inserted.
upsert = pa.table({
    "user_id": pa.array([1, 3, 4, 5], type=pa.int64()),
    "name": ["Cheshire", "Felix", "Tom", "Simpkin"],
    "age":  pa.array([3, 2, 5, 12], type=pa.int32()),
    "job":  ["Tour Guide", "Drifter", "Housekeeper", "Mouser"],
})
client.catalog.write(
    upsert,
    namespace="demo",
    table="users",
    mode="merge",
    format="lance",
)

print(client.catalog.read(namespace="demo", table="users", read_as="pyarrow"))

# Delete: only the merge-key columns need to be supplied.
client.catalog.write(
    pa.table({"user_id": pa.array([3, 5], type=pa.int64())}),
    namespace="demo",
    table="users",
    mode="delete",
    format="lance",
)

# user_id 3 and 5 are gone; 1, 2, 4 remain.
print(client.catalog.read(namespace="demo", table="users", read_as="pyarrow"))
```

PackDS v5 tables do not create DeltaCAT merge keys by default. For
canonical PackDS training data, use `episode_id` identity partitioning
and `REPLACE_PARTITION`; explicit merge keys remain available when a
caller deliberately wants normal DeltaCAT `MERGE`/`DELETE` semantics.
See [Training Data](docs/training-data.md).

#### PackDS Tables Without Merge Keys

PackDS v5 tables are identity-partitioned by `episode_id` with no merge
keys by default. Live feature enrichment then runs episode-at-a-time:
discover the episodes you want to enrich from the auto-maintained
`<table>__episodes` companion, then rewrite each target episode with
`mode="replace_partition"`. The replaced partition is committed
atomically; sibling episodes are never touched.

```python
import pyarrow as pa
from deltacat_client import Client

client = Client("http://localhost:8080")

# 1. Initial PackDS write across multiple episodes. DeltaCAT auto-creates
#    the table with episode_id identity partitioning, no merge keys, and
#    the `<table>__episodes` companion.
initial = pa.table({
    "episode_id": [
        "ep_0001", "ep_0001", "ep_0001",
        "ep_0002", "ep_0002",
        "ep_0003", "ep_0003", "ep_0003", "ep_0003",
    ],
    "step_index": [0, 1, 2, 0, 1, 0, 1, 2, 3],
    "task": [
        "pick", "pick", "pick",
        "place", "place",
        "stack", "stack", "stack", "stack",
    ],
    "annotation": [None] * 9,
})
client.catalog.write(
    initial,
    namespace="robotics",
    table="episode_steps",
    format="lance",
    create_if_missing={
        "schema_def": [
            {"name": "episode_id", "type": "string"},
            {"name": "step_index", "type": "int64"},
            {"name": "task", "type": "string"},
            {"name": "annotation", "type": "string"},
        ],
        "layout": "packds",
        "default_content_type": "lance",
        "auto_create_namespace": True,
    },
)

# 2. Discover which episodes exist by reading the companion table.
#    Companion rows are maintained after PackDS commits by default, so
#    recently-written rows may be briefly absent while background maintenance
#    catches up.
#    The companion is a normal Parquet table; it is not subject to the
#    PackDS read-scope guard, so an unfiltered read is fine.
companion = client.catalog.read(
    namespace="robotics",
    table="episode_steps__episodes",
    read_as="pyarrow",
    columns=["episode_id", "total_frames"],
).sort_by("episode_id")

# Pick the first episode by id. The companion's read order is not
# guaranteed without a sort, so we sort first and pull both the id and
# the row count from the same row.
target_id = companion["episode_id"][0].as_py()
target_len = companion["total_frames"][0].as_py()

# 3. Enrich the target episode end-to-end via REPLACE_PARTITION. No
#    merge keys, no merge-on-read. The replacement is per-partition;
#    sibling episodes are unaffected. Note that ``layout="packds"`` is
#    only required at create time; subsequent writes pick the layout up
#    from the table's stored ``dataset_layout`` property.
enriched = pa.table({
    "episode_id": [target_id] * target_len,
    "step_index": list(range(target_len)),
    "task": ["pick"] * target_len,
    "annotation": [f"auto-labeled-step-{i}" for i in range(target_len)],
})
client.catalog.write(
    enriched,
    namespace="robotics",
    table="episode_steps",
    mode="replace_partition",
    format="lance",
)

# 4. Scoped read of the enriched episode. PackDS reads must be scoped
#    to a finite set of episode_ids (equality / IN) or a partition
#    filter; unscoped full-table PackDS reads are rejected by default
#    and require ``allow_full_packds_scan=True`` (or
#    ``DELTACAT_ALLOW_FULL_PACKDS_SCAN=true`` server-side) to opt in.
print(client.catalog.read(
    namespace="robotics",
    table="episode_steps",
    read_as="pyarrow",
    filter_predicate={"eq": ["episode_id", target_id]},
    columns=["episode_id", "step_index", "task", "annotation"],
))
```

Larger enrichment loops typically batch over a filtered companion read
(e.g. `filter_predicate={"in": ["episode_id", batch_ids]}`) so each
`replace_partition` write touches exactly one episode while the worker
pool processes many in parallel. Because each episode is its own
DeltaCAT partition, concurrent replace-partition writes to disjoint
episodes do not contend.

See [Training Data](docs/training-data.md) for the companion-table
schema and guidance on choosing between this no-merge-key flow and an
explicit-merge-key PackDS table.

</details>

<details>

<summary><b>Transactions</b></summary>

Transactions provide atomic multi-step operations with automatic heartbeating and rollback on failure.

```python
import pyarrow as pa

with client.transaction(commit_message="Backfill predictions") as tx:
    client.catalog.write(
        pa.table({"episode_id": [10, 11], "score": [0.88, 0.92]}),
        namespace="robotics",
        table="predictions",
        mode="add",
    )

    # Reads within the transaction see uncommitted writes
    df = client.catalog.read(
        namespace="robotics",
        table="predictions",
        read_as="pandas",
    )
    print(f"Rows visible in transaction: {len(df)}")
# Transaction commits automatically on exit; aborts on exception
```

See [Transactions](docs/transactions.md) for time-travel reads, manual commit/abort, and transaction rules.

</details>

<details>

<summary><b>Jobs</b></summary>

DeltaCAT uses a durable job system for background work (compaction, data relay, subscription processing). The client can submit, monitor, and execute jobs.

```python
# List all jobs
jobs = client.jobs.list()
for job in jobs:
    print(f"{job.job_id}: {job.state}")

# Submit a compaction job
result = client.jobs.submit_compaction(table="predictions")
print(f"Submitted: {result.job_id}")

# Wait for it to complete
status = client.jobs.wait(result.job_id, timeout_seconds=120)
print(f"Final state: {status.state}")
```

Workers claim and execute jobs. Any process can act as a worker:

```python
# Claim the next pending job matching our worker tags
job = client.jobs.claim(worker_tags=["subscriber"])
if job:
    print(f"Claimed {job.job_id} (type: {job.context.get('job_type')})")

    # Report fine-grained progress and declare the deadline for the next chunk
    client.jobs.emit_event(
        job,
        event_name="batch_started",
        completed=1,
        expected=4,
        metadata={"batch": 1},
        heartbeat_timeout_seconds=300,
    )

    # Do the work...

    # Mark complete. Source-consuming jobs must include the advanced
    # watermark so the server can persist progress correctly.
    client.jobs.complete(
        job,
        records_processed=1000,
        watermark={
            "partition_watermarks": {"analytics.events": 42},
            "known_partitions": ["analytics.events"],
        },
    )
```

See [Jobs and Workers](docs/jobs-and-workers.md) for job types, worker
routing, heartbeat rules, retry semantics, and dispatch modes.

### Managed Ray Example

For subscription, transform, and publication jobs, `dispatch_mode="ray"`
submits the work onto DeltaCAT-managed Ray clusters instead of the shared
custom-worker claim pool. For large prebuilt environments, prefer
`docker.image` as the primary runtime artifact and use `payload` only for
small code/config overlays. The server still vends the DeltaCAT runtime from
its promoted runtime manifest so the remote cluster gets compatible
`deltacat`, `deltacat_client`, and `deltacat_io_core` packages.

```python
from deltacat_client import Client

client = Client("http://localhost:8080", bearer_token="token")

payload = client.jobs.stage_ray_payload(
    py_modules=["./shared_logic"],
)

ray_dispatch_yaml = """
cluster_shutdown_policy: terminate
docker:
  image: "registry.example.com/ml/runtime@sha256:0123456789abcdef"
payload:
  py_modules:
    - {py_module}
head_node_type: ray.head.default
available_node_types:
  ray.head.default:
    node_config:
      InstanceType: m7i.2xlarge
  ray.worker.default:
    min_workers: 1
    max_workers: 4
    node_config:
      InstanceType: m7i.2xlarge
""".format(
    py_module=payload.payload["py_modules"][0],
)

client.subscriptions.create(
    subscriber_id="episode_processor",
    source_tables=[{"namespace": "robotics", "table": "raw_episodes"}],
    subscriber_type="custom",
    dispatch_mode="ray",
    dispatch_config=ray_dispatch_yaml,
    trigger={"mode": "schedule", "schedule": {"interval_seconds": 300}},
)
```

The scheduler now auto-triggers the subscription every five minutes, DeltaCAT
auto-provisions the Ray cluster when a run starts, and
`cluster_shutdown_policy: terminate` tears the cluster back down after each run
by default. If the deployment advertises a compatible image profile, DeltaCAT
will execute inside the requested `docker.image` while keeping the promoted
DeltaCAT runtime bundle as the compatibility layer.

For reference, the Ray launcher YAML above is:

```yaml
cluster_shutdown_policy: terminate
docker:
  image: registry.example.com/ml/runtime@sha256:0123456789abcdef
payload:
  py_modules:
    - s3://.../shared_logic.tar.gz
head_node_type: ray.head.default
available_node_types:
  ray.head.default:
    node_config:
      InstanceType: m7i.2xlarge
  ray.worker.default:
    min_workers: 1
    max_workers: 4
    node_config:
      InstanceType: m7i.2xlarge
```

Use `payload` for small overlays only:

- experiment modules
- config bundles
- supplemental wheels

Do not treat `payload` as the primary transport for a large application image.

After launch, owner/admin callers can inspect cluster metadata and then use
the managed-Ray access helpers against a specific job:

```python
# `access` describes the currently discovered cluster shape for the job:
# region, cluster name, head instance id/private IP, worker ids, and the
# launch template metadata DeltaCAT used for the cluster.
access = client.jobs.get_managed_ray_access("job-id")
print(access.head_instance_id, access.head_private_ip)

# By default the SSM session starts on the Ray head node.
# The AWS CLI / Session Manager plugin will open an interactive session in
# your terminal, and the helper asks SSM to launch `bash -l` there so you land
# directly in a login shell on the instance.
client.jobs.start_managed_ray_ssm_session("job-id")

# For non-interactive inspection across the whole cluster, use send-command.
client.jobs.send_managed_ray_ssm_command("job-id", commands=["hostname"], target="all")
```

</details>

<details>

<summary><b>Publications</b></summary>

Publications are incremental producers that write new data into the
DeltaCAT Lakehouse. They sit at the root of a pipeline DAG and can be
triggered manually or fired by an upstream event.

```python
# Create a publication that writes to a sink table
client.publications.create(
    publication_id="episode_publisher",
    name="Episode Publisher",
    sink_tables=[{"namespace": "robotics", "table": "clean_episodes"}],
    dispatch_mode="local",
)

# Run the publication
result = client.publications.run("episode_publisher")
print(f"Published: {result}")
```

See [Pipelines](docs/pipelines.md) for publication configuration and DAG construction.

</details>

<details>

<summary><b>Subscriptions</b></summary>

Subscriptions are incremental consumers of DeltaCAT tables. They sit at
the leaves of a pipeline DAG, tracking a per-partition watermark so each
run picks up only new data.

```python
# Create a subscription that watches for new data in "raw_episodes"
client.subscriptions.create(
    subscriber_id="episode_processor",
    source_tables=[{"namespace": "robotics", "table": "raw_episodes"}],
    subscriber_type="custom",
    dispatch_mode="custom",
)

# Trigger processing (dispatches a job to a subscriber worker)
client.subscriptions.trigger("episode_processor")

# Check watermark state
wm = client.subscriptions.get_watermark("episode_processor")
print(f"Watermark: {wm.watermark}")

# Pause / resume / delete
client.subscriptions.pause("episode_processor")
client.subscriptions.resume("episode_processor")
client.subscriptions.delete("episode_processor")
```

See [Pipelines](docs/pipelines.md) for subscription modes (delta vs. version), triggers, and redrive.

</details>

<details>

<summary><b>Transforms</b></summary>

Transforms are the intermediate nodes of a pipeline DAG. Each transform
reads from one or more source tables, applies processing logic, and
writes to one or more sink tables.

```python
# Create a transform: raw_episodes -> clean_episodes
client.transforms.create(
    transform_id="episode_cleaner",
    name="Episode Cleaner",
    source_tables=[{"namespace": "robotics", "table": "raw_episodes"}],
    sink_tables=[{"namespace": "robotics", "table": "clean_episodes"}],
    dispatch_mode="custom",
)

# Trigger transform processing
client.subscriptions.trigger("episode_cleaner")

# Pause / resume
client.transforms.pause("episode_cleaner")
client.transforms.resume("episode_cleaner")
```

See [Pipelines](docs/pipelines.md) for transform configuration, redrive, and rollback.

</details>

<details>

<summary><b>Pipelines</b></summary>

Pipelines wire publications, transforms, and subscriptions into a
connected DAG. When an upstream node completes, downstream nodes are
triggered automatically.

```python
# First, create connected pipeline nodes
client.publications.create(
    publication_id="ingest_pub",
    name="Raw Ingest Publisher",
    sink_tables=[{"namespace": "robotics", "table": "raw_data"}],
    dispatch_mode="local",
)
client.transforms.create(
    transform_id="clean",
    name="Data Cleaner",
    source_tables=[{"namespace": "robotics", "table": "raw_data"}],
    sink_tables=[{"namespace": "robotics", "table": "clean_data"}],
    dispatch_mode="custom",
)
client.subscriptions.create(
    subscriber_id="consume_clean",
    source_tables=[{"namespace": "robotics", "table": "clean_data"}],
    subscriber_type="custom",
    dispatch_mode="custom",
)

# Preview the connected pipeline from an interior seed node
preview = client.pipelines.discover(seed_node_ids=["clean"])
print(preview.execution_order)

# Option A: persist exactly the previewed node_ids
client.pipelines.create(
    pipeline_id="etl_pipeline_pinned",
    name="ETL Pipeline (Pinned)",
    node_ids=preview.node_ids,
)

# Option B: create directly from seed node ids
client.pipelines.create(
    pipeline_id="etl_pipeline_seeded",
    name="ETL Pipeline (Seeded)",
    seed_node_ids=["clean"],
)

# Check pipeline status
status = client.pipelines.status("etl_pipeline_seeded")

# Pause / resume all nodes at once
client.pipelines.pause("etl_pipeline_seeded")
client.pipelines.resume("etl_pipeline_seeded")
```

See [Pipelines](docs/pipelines.md) for DAG construction, discovery
semantics, redrive, rollback, and stored-order validation.

</details>

<details>

<summary><b>Data Placement and Replication</b></summary>

DeltaCAT catalogs can span multiple storage backends (S3, SwiftStack, Lustre). Data placement lets you replicate tables across roots so readers access data from the closest location.

```python
# List available data roots for the catalog. The keys of this dict
# are the valid values to pass as `roots=` and `root=` below.
catalog_info = client.catalog.describe_catalog("default")
available_roots = catalog_info.data_roots or {}
for root_name, root_info in available_roots.items():
    print(root_name, root_info)

# Example output (dict[str, DataRootInfoSummary]):
# aws_s3_iad       {'root': 's3://bucket-iad/',        'storage_type': 's3',
#                   'region': 'us-east-1', 'endpoint_url': None}
# aws_s3_pdx       {'root': 's3://bucket-pdx/',        'storage_type': 's3',
#                   'region': 'us-west-2', 'endpoint_url': None}
# swiftstack_pdx   {'root': 's3://swiftstack-bucket/', 'storage_type': 's3',
#                   'region': None,        'endpoint_url': 'https://pdx.swiftstack.example.com'}

# Pick a target root from what the catalog actually advertises.
# In production you'd choose based on region, storage class, or policy.
target_root = next(iter(available_roots))

# Place a table on an additional storage root for replication
client.catalog.place(
    namespace="robotics",
    table="episodes",
    roots=[target_root],         # Replicate to this root
    backfill=True,               # Copy existing data too
)

# Check replication status
status = client.catalog.replication_status(namespace="robotics", table="episodes")
print(f"Roots: {status}")

# Read from the closest root (server resolves automatically)
df = client.catalog.read(
    namespace="robotics",
    table="episodes",
    read_as="pandas",
    root=target_root,            # Prefer this root for file paths
)

# Remove a replication target
client.catalog.unplace(namespace="robotics", table="episodes", root=target_root)
```

New writes are automatically replicated to all placed roots via a background subscriber. Reads with `root=` get file paths rewritten through the preferred root when data is available there.

See [Configuration](docs/configuration.md) for data root setup and multi-root catalog configuration.

</details>

<details>

<summary><b>Discovery Root Administration</b></summary>

Catalog admins can publish read-only discovery roots for system crawler, GC,
relay, replication, and placement jobs. Discovery roots are separate from
managed data roots, so they can describe authority roots such as `s3://`
without becoming valid table write targets.

```python
client.catalog.create_storage_binding(
    "default",
    name="swiftstack_pdx_root",
    storage_type="s3",
    storage_class="discovery",
    uri_scheme="s3",
    endpoint_url="https://pdx.swiftstack.example.com",
    region="us-west-2",
    credential_source={"kind": "aws_profile", "profile": "pdx"},
)

client.catalog.create_discovery_root(
    "default",
    name="swiftstack_pdx",
    binding_name="swiftstack_pdx_root",
    root_uri="s3://",
    authorization={"principals": ["service:system-crawler"]},
)

roots = client.catalog.list_discovery_roots(
    "default",
    include_disabled=True,
    include_history=True,
)
details = client.catalog.describe_discovery_root(
    "default",
    "swiftstack_pdx",
    include_history=True,
)

# Updates create a new immutable discovery-root version.
client.catalog.update_discovery_root(
    "default",
    "swiftstack_pdx",
    binding_name="swiftstack_pdx_root",
    root_uri="s3://",
    description="PDX authority root",
    authorization={"principals": ["service:system-crawler"]},
)

# Deletes are logical disables and remain visible with include_disabled=True.
client.catalog.delete_discovery_root("default", "swiftstack_pdx")
```

Discovery-root detail responses include the effective binding revision,
endpoint, region, authorization summary, lifecycle state, sanitized
credential-source metadata, and effective storage snapshot.

</details>

<details>

<summary><b>Scheduled Processing</b></summary>

Subscriptions and transforms can run on a schedule instead of being triggered manually. DeltaCAT supports interval-based and cron-based scheduling.

```python
# Process new data every 5 minutes
client.subscriptions.create(
    subscriber_id="metrics_ingester",
    source_tables=[{"namespace": "telemetry", "table": "raw_metrics"}],
    subscriber_type="custom",
    dispatch_mode="custom",
    trigger={"mode": "schedule", "schedule": {"interval_seconds": 300}},
)

# Process at 2am UTC daily using a cron expression
client.subscriptions.create(
    subscriber_id="nightly_aggregator",
    source_tables=[{"namespace": "telemetry", "table": "raw_metrics"}],
    subscriber_type="custom",
    dispatch_mode="custom",
    trigger={"mode": "schedule", "schedule": {"cron": "0 2 * * *", "timezone": "UTC"}},
)

# Event-driven: only runs when triggered manually or by an upstream pipeline node
client.subscriptions.create(
    subscriber_id="on_demand_processor",
    source_tables=[{"namespace": "telemetry", "table": "raw_metrics"}],
    subscriber_type="custom",
    dispatch_mode="custom",
    trigger={"mode": "event"},
)
```

See [Configuration](docs/configuration.md) for trigger options and scheduling details.

</details>


## Agentic Access (MCP)

DeltaCAT ships with a built-in [Model Context
Protocol](https://modelcontextprotocol.io) server so AI agents (for
example, Claude Code) can browse catalogs, inspect schemas, plan reads,
and stage writes through natural language instead of hand-written Python.

Most hand-written code should just use the REST `Client(...)` shown
above. Reach for MCP when you want to:

- let an agent explore and operate on your catalog conversationally
- embed DeltaCAT as a tool in an agentic application
- use a typed async Python wrapper over the MCP HTTP surface
  (`deltacat-client[mcp]`) from code that is already agentic in shape

See the [MCP Server guide](../deltacat/docs/mcp/README.md) for the full
tool reference, the typed async client, and recipes for agent-driven
catalog workflows.


## Server Setup

The DeltaCAT client connects to a DeltaCAT API server. For setup instructions, see:

- **[Server Setup Guide](docs/server-setup.md)** -- start a local or production server
- **[REST API Reference](../deltacat/docs/server/README.md)** -- full REST endpoint documentation
- **[MCP Server Guide](../deltacat/docs/mcp/README.md)** -- agentic access via MCP


## Additional Resources

| Guide | Description |
|-------|-------------|
| [Reading and Writing](docs/reading-and-writing.md) | Read plans, write modes, staged writes, supported formats |
| [Training Data](docs/training-data.md) | PackDS tables, episode indexes, shard manifests, distributed training |
| [Transactions](docs/transactions.md) | Transaction lifecycle, time travel, rules and limitations |
| [Jobs and Workers](docs/jobs-and-workers.md) | Job types, claiming, heartbeat, worker routing, authentication |
| [Pipelines](docs/pipelines.md) | Publications, transforms, subscriptions, DAGs, redrive |
| [Configuration](docs/configuration.md) | Auth, dispatch modes, triggers, data placement |
| [Maintainer Workflow](docs/maintainer-workflow.md) | Relationship between REST and MCP, generated bindings, facade updates, and validation guards |
| [Architecture](docs/architecture.md) | Package boundary, generated client, development notes |
| [Client Compatibility Runbook](../deltacat/docs/deploy/CLIENT_COMPATIBILITY_RUNBOOK_2026-04-12.md) | Promotion-time packaged client/server compatibility validation |

For the core DeltaCAT data model, storage architecture, and catalog APIs, see the [DeltaCAT documentation](../README.md#overview).
