Metadata-Version: 2.4
Name: snowflake-data-migration-orchestrator
Version: 0.10.0
Summary: Data migration orchestrator for Snowflake
Project-URL: Bug Tracker, https://github.com/snowflake-eng/migrations-data-validation/issues
Project-URL: Source code, https://github.com/snowflake-eng/migrations-data-validation/
Project-URL: homepage, https://www.snowflake.com/
Author-email: "Snowflake, Inc." <snowflake-python-libraries-dl@snowflake.com>
License: Snowflake Conversion Software Terms
Keywords: Snowflake,cloud,data,database,migration,orchestrator
Classifier: Development Status :: 5 - Production/Stable
Classifier: License :: Other/Proprietary License
Classifier: Programming Language :: Python
Classifier: Programming Language :: Python :: 3.11
Classifier: Programming Language :: Python :: 3.12
Classifier: Programming Language :: Python :: Implementation :: CPython
Requires-Python: >=3.11
Requires-Dist: jinja2>=3.1.0
Requires-Dist: pandas>=2.3.3
Requires-Dist: pydantic>=2.0
Requires-Dist: pyyaml>=6.0
Requires-Dist: snowflake-connector-python>=4.0.0
Requires-Dist: snowflake-data-validation
Requires-Dist: urllib3>=2.6.0
Provides-Extra: development
Requires-Dist: parameterized>=0.9.0; extra == 'development'
Requires-Dist: pytest-asyncio>=0.24.0; extra == 'development'
Requires-Dist: pytest-cov>=4.0.0; extra == 'development'
Requires-Dist: pytest-mock>=3.10.0; extra == 'development'
Requires-Dist: pytest>=7.0.0; extra == 'development'
Requires-Dist: ruff>=0.1.0; extra == 'development'
Description-Content-Type: text/markdown

# Snowflake Data Migration Orchestrator

[![Python](https://img.shields.io/badge/python-3.11+-blue)](https://www.python.org/downloads/)

The Cloud Data Migration feature of SnowConvert provides a fault-tolerant, scalable solution for moving data from external sources into **Snowflake**. This tool is specifically designed for cases where a user is moving data from a system they plan to decommission. For replication purposes, other solutions are available that might better fit your use case.

## Architecture

- 1 **Orchestrator** is connected to the **Snowflake Account**.
  - It requires privileges to create/operate the **SNOWCONVERT_AI** database, in which metadata is stored.
- 1 or more **Workers** connect to the **Source System** and to the **Snowflake Account**.
  - **Workers** read data from the **Source System** and upload it to a **Snowflake Stage**.
  - **Workers** pick up tasks created by the **Orchestrator** and process them in parallel.
- Files uploaded to the Snowflake Stage are copied into the **Target Tables** using a [COPY INTO](https://docs.snowflake.com/en/sql-reference/sql/copy-into-table) statement.
  - The [COPY INTO](https://docs.snowflake.com/en/sql-reference/sql/copy-into-table) statement is submitted and monitored by the **Orchestrator**.

### Where to Deploy Orchestrator and Worker(s)?

The **Orchestrator** and **Worker(s)** can be deployed in multiple ways:

1. Both on [Snowpark Container Services](https://docs.snowflake.com/en/developer-guide/snowpark-container-services/overview) (in the **Snowflake Account**).
2. Both on the **Customer's Environment** (custom hardware, virtual machines, containers, etc.).
3. **Orchestrator** on [Snowpark Container Services](https://docs.snowflake.com/en/developer-guide/snowpark-container-services/overview) and **Worker(s)** on the **Customer's Environment** (or the other way around).

Requirements for the environment:

- The **Orchestrator** and **Worker(s)** are **Python packages**, so Python must be installed.
- The **Worker(s)** will typically require an **ODBC** driver to connect to the **Source System**.
- The **Orchestrator** needs to be able to connect to the **Snowflake Account**. The connection used must have privileges to create the **SNOWCONVERT_AI** database and create schemas/objects on that database.

## Setup

### Additional Configuration on Snowflake Account

When starting the Orchestrator, it will automatically try to set up resources in your Snowflake Account in the **SNOWCONVERT_AI** database (if it does not exist yet, it will be created). This is a one-time step and transparent to the user. Some considerations:

- The **Orchestrator** should connect with a role that has privileges to create the **SNOWCONVERT_AI** database and its objects.
- Whenever the **Orchestrator** starts, it should use a role that allows it to interact with the **SNOWCONVERT_AI** database and its resources. The easiest way to guarantee this is to always run it with the same role that was used for creating **SNOWCONVERT_AI** in the first place.

## Usage

In general, for migrating data using this solution, you will need to:

1. Start the **Orchestrator**.
2. Start the **Worker(s)**.
3. Create a **Data Migration Workflow**.
4. Monitor the **Data Migration Workflow** (asynchronously) until completion.

A **Data Migration Workflow** is essentially an action/goal for the system to complete, such as migrating a specific set of tables with a given configuration. You can submit multiple workflows simultaneously and monitor them. The Orchestrator breaks Data Migration Workflows into smaller tasks. Normally, this also involves splitting a table into partitions before extracting its data and loading it to **Snowflake**.

### Starting the Orchestrator

After installation, start the Orchestrator by running:

```shell
python -m data_migration_orchestrator start
```

When invoked without a subcommand, `start` is assumed for backward compatibility.

The `start` command accepts optional flags:

| Flag | Default | Description |
|------|---------|-------------|
| `--log-destination` | `both` | Where to send logs: `stdout`, `file`, or `both`. |
| `--log-file` | `logs/data_migration_orchestrator.log` | Path to the log file (used when destination includes `file`). |
| `--log-level` | `INFO` | Logging level: `DEBUG`, `INFO`, `WARNING`, `ERROR`, or `CRITICAL`. |

Before running, make sure that the **SNOWFLAKE_CONNECTION_NAME** environment variable is set to a value that matches one of the connection names in your Snowflake [config.toml](https://docs.snowflake.com/en/developer-guide/snowflake-cli/connecting/configure-connections) or [connections.toml](https://docs.snowflake.com/en/developer-guide/snowflake-cli/connecting/configure-connections). That is the name of the connection used to connect to the **Target Snowflake Account**.

**Connection** session defaults use `SNOWFLAKE_DATABASE` and `SNOWFLAKE_SCHEMA` where your environment provides them (for example in SPCS). Those are separate from **metadata object locations**: by default, workflow and task-queue objects live in `SNOWCONVERT_AI.DATA_MIGRATION` and data-validation objects in `SNOWCONVERT_AI.DATA_VALIDATION`. To deploy metadata under different database or schema names, set:

- `CUSTOM_SNOWFLAKE_DATABASE_FOR_METADATA` (default `SNOWCONVERT_AI`)
- `CUSTOM_SNOWFLAKE_SCHEMA_FOR_DATA_MIGRATION_METADATA` (default `DATA_MIGRATION`)
- `CUSTOM_SNOWFLAKE_SCHEMA_FOR_DATA_VALIDATION_METADATA` (default `DATA_VALIDATION`)

Workers that call task-queue stored procedures must use the same `CUSTOM_*` values as the orchestrator.

The **Orchestrator** will run until you stop it. **Data Migration Workflows** need an active **Orchestrator** to be completed. However, the **Orchestrator** can be safely stopped at any point and resumed later (ongoing **Data Migration Workflows** will be resumed at that point).

### Starting the Worker(s)

After installation (`pip install snowflake-data-exchange-agent`, or `pip install snowflake-data-exchange-agent[teradata]` when the Worker connects to Teradata with the native driver), start a Worker by running:

```shell
data-exchange-agent run -c <configuration-file-path>
```

The `-c` flag can be omitted; in that case, the worker will look for a file called **configuration.toml** in your current directory. When invoked without a subcommand (`data-exchange-agent -c ...`), `run` is assumed for backward compatibility. See the [Worker Configuration](#worker-configuration) section below for the full specification.

You can also verify connectivity before starting:

```shell
data-exchange-agent test -c <configuration-file-path>
```

This executes `SELECT 1` on every configured source and target connection and reports the results.

**Workers** will run until you stop them. **Data Migration Workflows** and **Cloud Data Validation Workflows** need at least one active **Worker** to be completed. However, the **Workers** can be safely stopped at any point and resumed later (ongoing workflows will be resumed at that point).

### Creating a Data Migration Workflow

After installation, create workflows by running:

```shell
python -m data_migration_orchestrator create-data-migration-workflow <workflow-config-file-path> --source-platform <source-platform> [--name <workflow-name>] [--connection-name <connection-name>]
```

- The **Workflow Configuration** specification can be found in the [Workflow Configuration Reference](#workflow-configuration-reference) section.
- **`--source-platform` is required** for this subcommand. Supported values are `sqlserver`, `redshift`, `oracle`, and `teradata`.
- The workflow name must be composed of alphanumerical characters and cannot start with a digit. Defaults to `MY_WORKFLOW` when omitted.
- `--connection-name` is optional. When omitted, the orchestrator uses the default Snowflake connection from environment variables. When provided, it should match a named connection in your **config.toml** or **connections.toml** file.

### Monitoring a Data Migration Workflow

Each **Workflow** will go through different stages through its lifecycle:

1. **Pending**: No tasks have been created for this workflow yet.
2. **Executing**: Tasks have been created for this workflow and there are still tasks that haven't reached a terminal state (COMPLETED or FAILED).
3. **Completed:** All tasks have reached a terminal state (COMPLETED or FAILED).

In the **data migration metadata schema** (by default **SNOWCONVERT_AI.DATA_MIGRATION**) there are tables/views that can be queried to understand the status of one or more Workflows:

| View/Table | Description |
|------------|-------------|
| **WORKFLOW** | One row per workflow. Includes start/end time, status, and configuration. |
| **TABLE_PROGRESS_WITH_EXAMPLE_ERROR** | One row per table being migrated. Shows how many partitions are in each stage (extraction, loading, completed, or failed), along with related errors. Filterable by `WORKFLOW_ID`. |
| **DATA_MIGRATION_ERROR** | For each failed partition, contains the first known error. Filterable by `WORKFLOW_ID`. |
| **DATA_MIGRATION_WARNING** | Non-fatal warnings emitted during migration (e.g. type fallbacks, truncated columns). Filterable by `WORKFLOW_ID`. |

In the same schema, there is a [Streamlit](https://docs.snowflake.com/en/developer-guide/streamlit/about-streamlit) dashboard called **DATA_MIGRATION_DASHBOARD** that can be used to monitor the workflows. The dashboard is organized around **tables** (the primary user-facing concept) rather than workflow executions. Its default tabs are:

- **📋 Tables** — primary view. One row per distinct table aggregated across every workflow that migrated it, with drill-down to per-execution detail. Surfaces per-table `TOTAL_ROWS` and the target `CLOUD_STAGE` bucket.
- **⚠️ Errors** — table-grouped expanders (and a flat-list fallback) over recent migration errors.
- **📊 Overview** — high-level KPIs including total rows migrated.

Every tab has a **⬇ Download CSV** button for its on-screen dataframe, and the sidebar exposes a **📦 Export snapshot (CSV)** that bundles every active section into one multi-section file. The sidebar table search prefilters every table-driven view. Toggle **Show advanced / debug views** in the sidebar to reveal the **Workflows** and **Tasks** (task queue) debug tabs plus raw internal columns.

### Managing Workflows

The migration metadata schema (default **SNOWCONVERT_AI.DATA_MIGRATION**) exposes stored procedures for pausing, resuming, and cancelling work. They can be called directly from a Snowflake worksheet or any SQL client.

#### Pause

Pausing moves all `pending` and `executing` tasks to `paused` status and revokes any active leases. Paused tasks cannot be picked up by executors until they are resumed.

```sql
-- Pause an entire workflow
CALL DATA_MIGRATION.PAUSE_WORKFLOW(<workflow_id>);

-- Pause a single table within a workflow
CALL DATA_MIGRATION.PAUSE_TABLE(<workflow_id>, '<TABLE_SOURCE_IDENTIFIER>');
```

`PAUSE_WORKFLOW` also sets the workflow status to **paused**. `PAUSE_TABLE` leaves the workflow status unchanged.

#### Resume

Resuming moves all `paused` tasks back to `pending` so executors can pick them up again.

```sql
-- Resume an entire workflow
CALL DATA_MIGRATION.RESUME_WORKFLOW(<workflow_id>);

-- Resume a single table within a workflow
CALL DATA_MIGRATION.RESUME_TABLE(<workflow_id>, '<TABLE_SOURCE_IDENTIFIER>');
```

`RESUME_WORKFLOW` also sets the workflow status back to **running**.

#### Cancel

Cancelling moves all `pending`, `executing`, and `paused` tasks to `failed` with the error message `Manually cancelled.`. The failure is cascaded to any blocked successor tasks, and cleanup tasks are unblocked so they can run.

```sql
-- Cancel an entire workflow
CALL DATA_MIGRATION.CANCEL_WORKFLOW(<workflow_id>);

-- Cancel a single table within a workflow
CALL DATA_MIGRATION.CANCEL_TABLE(<workflow_id>, '<TABLE_SOURCE_IDENTIFIER>');
```

`CANCEL_WORKFLOW` also sets the workflow status to **cancelled**.

> **Note:** The `TABLE_SOURCE_IDENTIFIER` parameter is the fully qualified source table name as it appears in the task scope (e.g., `MY_DB.MY_SCHEMA.MY_TABLE`). You can find it in the **SCOPE** column of the **TASK_QUEUE** table inside the `Table[...]` fragment.

## Cloud Data Validation Workflows

The orchestrator can run **Cloud Data Validation** workflows in addition to data migration. Validation work is queued as `data_validation` tasks; the same **Worker** package (`snowflake-data-exchange-agent`) executes them when the optional **snowflake-data-validation** dependency is available in the worker environment. Create a validation workflow with:

```shell
python -m data_migration_orchestrator create-data-validation-workflow <validation-config-file-path> --source-platform <source-platform> [--name <workflow-name>] [--connection-name <connection-name>]
```

- **`--source-platform` is required** for this subcommand. Supported values are `sqlserver`, `redshift`, `teradata`, `oracle`, and `postgresql`. Its value must match the `source_platform` field in the JSON configuration file.
- `--name` defaults to `MY_VALIDATION_WORKFLOW` when omitted.
- `--connection-name` is optional. When omitted, the orchestrator uses the default Snowflake connection from environment variables.
- New workflow rows are inserted into the **`WORKFLOW`** table in the **data migration metadata schema** (default **`SNOWCONVERT_AI.DATA_MIGRATION`**) with `WORKFLOW_TYPE` set to **`data-validation`**. Validation **results** and related objects are stored under the **data validation metadata schema** (default **`SNOWCONVERT_AI.DATA_VALIDATION`**, configurable with `CUSTOM_SNOWFLAKE_SCHEMA_FOR_DATA_VALIDATION_METADATA`).

### Monitoring a Data Validation Workflow

In the **data validation metadata schema** (by default **SNOWCONVERT_AI.DATA_VALIDATION**) there are views that can be queried to understand the status of validation workflows:

| View | Description |
|------|-------------|
| **TABLE_PROGRESS** | One row per validated table. Summarizes overall validation status. Filterable by `WORKFLOW_ID`. |
| **TABLE_PROGRESS_DETAIL** | Per-table breakdown with partition-level L2/L3 status (VALID, INVALID, EXECUTION_ERROR). Filterable by `WORKFLOW_ID`. |
| **DATA_VALIDATION_ERROR** | Errors encountered during validation. Filterable by `WORKFLOW_ID`. |
| **DATA_VALIDATION_WARNING** | Non-fatal warnings (e.g. unsupported column types, metric exclusions). Filterable by `WORKFLOW_ID`. |

In the same schema, there is a [Streamlit](https://docs.snowflake.com/en/developer-guide/streamlit/about-streamlit) dashboard called **DATA_VALIDATION_DASHBOARD** that provides a visual overview of validation progress and results. Like the migration dashboard it is organized around **tables**: the **📋 Tables** tab is the primary view (cross-workflow aggregate + per-table drill-down with quick-links that pre-filter Schema/Metrics/Rows/Cell tabs). The Schema / Metrics / Rows / Cell / Table progress / Errors tabs are retained; a sidebar **Show advanced / debug views** toggle reveals the Tasks (task queue) debug tab. Every tab has a **⬇ Download CSV** button, and the sidebar exposes a **📦 Export snapshot (CSV)** that bundles every active section into one file.

### Validating Views

In addition to tables, Cloud Data Validation supports validating **views**. Place view entries in the top-level `views` array (same shape as `tables`). Entries under `views` are automatically tagged with `object_type = "VIEW"`, so there is no need to set `object_type` explicitly on each entry; however, you can also set `object_type` to `"VIEW"` directly on a `tables` entry if you prefer a flat list.

Views go through the same L1 (schema), L2 (metrics), and L3 (row/cell/hybrid) validation pipeline as tables, with one platform-specific difference:

- **Teradata views**: L1 schema validation uses a **basic** comparison (column existence and datatype only) because Teradata exposes view column metadata through `HELP COLUMN` rather than `DBC.Columns`. Precision, scale, length, nullable, and ordinal checks are not available for Teradata views. L2 metrics and L3 row/cell validation are fully supported.
- **Oracle views**: Validated identically to Oracle tables at all levels. Oracle exposes view column metadata through `ALL_TAB_COLUMNS` the same way as table metadata, so no materialization or special handling is needed.
- **Other platforms**: Views are validated identically to tables at all levels, since their catalogs expose view column metadata the same way as table metadata.

Partitioning (`partition_column`, `target_rows_per_partition`, `target_mb_per_partition`) works the same way for views as it does for tables. See the [view validation example](#view-validation-teradata-and-redshift) below for a complete workflow configuration.

### Data Validation Workflow Configuration (Top Level)

| Property | Type | Required | Description |
|----------|------|----------|-------------|
| `source_platform` | String | Yes | Source dialect identifier (for example `sqlserver`, `redshift`, `teradata`, `oracle`, `postgresql`). Must match the `--source-platform` argument when creating the workflow from the CLI. |
| `target_platform` | String | No | Defaults to `Snowflake`. |
| `target_database` | String | No | Default target database name for tables when not specified per table. |
| `validation_configuration` | Object | No | Global validation levels and options (see below). |
| `comparison_configuration` | Object | No | Numeric tolerance and optional type mapping file. |
| `database_mappings` | Object | No | Map of source database names to Snowflake database names. |
| `schema_mappings` | Object | No | Map of source schema names to Snowflake schema names. |
| `tables` | Array | Yes | At least one table (or view) to validate. |
| `views` | Array | No | Additional view entries using the same shape as `tables`. |
| `use_snowflake_compute` | Boolean | No | When `true`, enables Snowflake-side computation paths where supported. Default `false`. |
| `target_partition_size_rows` | Integer | No | **Desired** rows per partition. **Mutually exclusive** with `target_partition_size_mb`. Must be greater than 0 when set. When both targets are omitted, Data Validation defaults to **200 MB** per partition. See [Partitioning](#partitioning-column_names_to_partition_by). Overridable per table. |
| `target_partition_size_mb` | Integer | No | **Desired** MB per partition. **Mutually exclusive** with `target_partition_size_rows`. Must be greater than 0 when set. When both targets are omitted, Data Validation defaults to **200 MB** per partition. See [Partitioning](#partitioning-column_names_to_partition_by). Overridable per table. |
| `use_snowpipe_for_results` | Boolean | No | When `true` (default), L2/L3 validation results are ingested into the shared results tables via Snowpipe. Workers issue `ALTER PIPE REFRESH` after uploading each partition's files and the orchestrator waits for `SYSTEM$PIPE_STATUS` to report zero pending files before running the evaluate step. Set to `false` to fall back to the legacy per-partition `COPY INTO` tasks. |

### validation_configuration (global defaults)

When `validation_configuration` is omitted, the orchestrator applies these defaults: **schema** and **metrics** validation are **enabled**; **row** validation is **disabled**; `row_validation_mode` defaults to **`row`**; `continue_on_failure` defaults to **`false`**; `max_failed_rows_number` defaults to **100**; `exclude_metrics` defaults to **`false`**; `apply_metric_column_modifier` defaults to **`true`**. Any field set here can be overridden per table via a nested `validation_configuration` on that table entry.

| Property | Type | Description |
|----------|------|-------------|
| `schema_validation` | Boolean | Level 1: schema / column consistency checks. |
| `metrics_validation` | Boolean | Level 2: statistical metrics comparison. |
| `row_validation` | Boolean | Level 3: row-level or cell-level data comparison. |
| `row_validation_mode` | String | For row validation: typically `row` or `cell`. |
| `continue_on_failure` | Boolean | Whether to continue to the next validation level after a failure. |
| `max_failed_rows_number` | Integer | Cap on failed rows reported for L3 validation (must be greater than 0 when set). |
| `exclude_metrics` | Boolean | Whether to exclude unsupported metric columns. |
| `apply_metric_column_modifier` | Boolean | Whether to apply metric column modifiers. |
| `early_stopping` | Boolean | When `true`, L3 validation may stop early once the mismatch row count reaches the configured threshold (see [Early Stopping](#early-stopping-l3)). **Mandatory** when `row_validation_mode` is `hybrid`. |
| `early_stop_mismatch_row_threshold` | Integer | Mismatch row count at or above which pending L3 partition work is bulk-skipped. Must be greater than 0. Required when `early_stopping` is `true`. |
| `early_stop_check_interval_minutes` | Integer | Minutes between orchestrator poll ticks that check the mismatch count. Must be greater than 0. Required when `early_stopping` is `true`. |

### comparison_configuration

| Property | Type | Description |
|----------|------|-------------|
| `tolerance` | Number | Numeric comparison tolerance for metrics (must be greater than 0 when set). Default applied by the orchestrator when omitted is **0.001**. |
| `type_mapping_file_path` | String | Optional path to a custom type mapping file for comparisons. |

### Per-table / per-view entry (`tables` and `views`)

| Property | Type | Required | Description |
|----------|------|----------|-------------|
| `fully_qualified_name` | String | Yes | Source object name (format depends on source platform). |
| `use_column_selection_as_exclude_list` | Boolean | No | Default `false`. |
| `column_selection_list` | String[] | No | Columns to include or exclude per `use_column_selection_as_exclude_list`. |
| `target_name` | String | No | Target object name override. |
| `target_database` | String | No | Per-table target database override. |
| `target_schema` | String | No | Per-table target schema override. |
| `where_clause` | String | No | Filter on the source side. |
| `target_where_clause` | String | No | Filter on the target side. |
| `index_column_list` | String[] | No | Columns used to align rows on the source. |
| `target_index_column_list` | String[] | No | Columns used to align rows on the target. |
| `column_mappings` | Object | No | Map of source column name to target column name. |
| `is_case_sensitive` | Boolean | No | Case sensitivity for identifiers. |
| `max_failed_rows_number` | Integer | No | Overrides global cap for this object. |
| `exclude_metrics` | Boolean | No | Per-object metrics exclusion override. |
| `apply_metric_column_modifier` | Boolean | No | Per-object modifier override. |
| `object_type` | String | No | Typically `TABLE` or `VIEW`. |
| `column_names_to_partition_by` | String[] | No | Columns used for range-based (NTILE) partitioning during validation. Without this, the table is processed as a single partition. |
| `target_partition_size_rows` | Integer | No | Per-table override for desired rows per partition. **Mutually exclusive** with `target_partition_size_mb`. Must be greater than 0. |
| `target_partition_size_mb` | Integer | No | Per-table override for desired MB per partition. **Mutually exclusive** with `target_partition_size_rows`. Must be greater than 0. |
| `validation_configuration` | Object | No | Nested object with the same fields as global `validation_configuration` to override defaults for this object only. |

### Partitioning (`column_names_to_partition_by`)

When `column_names_to_partition_by` is set, the orchestrator splits the table into range-based partitions. Both Data Migration and Data Validation share the same sizing logic:

1. **Compute a target rows-per-partition** from whichever user setting is provided (the two are mutually exclusive):
   - `target_partition_size_rows` — used as-is.
   - `target_partition_size_mb` — converted to rows via `target_mb / avg_row_mb`.
   - If neither is set, the table is not split by the target alone. Data Validation defaults to **200 MB**; Data Migration uses platform-tuned values.
2. **Apply an internal cap.** System-imposed maximums (not user-configurable) limit partition size to safe infrastructure bounds. For Data Validation, caps on rows (up to 2,000,000 per partition), estimated megabytes per partition (up to 1,024), and cells (up to 100,000,000 per partition when the Snowflake target column count is known) apply **even if** you set `target_partition_size_rows` or `target_partition_size_mb`.
3. **Derive the partition count:** `ceil(row_count / effective_rows_per_partition)`, or 1 when the entire table fits in a single partition.

### Early Stopping (L3)

When a table has many partitions, L3 validation (row or cell comparison) can be expensive because every partition must be fully processed before the orchestrator evaluates results. **Early stopping** allows the workflow to abort remaining L3 partition work once enough mismatches have been detected, saving compute on both the source system and Snowflake.

#### How it works

1. When L3 partition tasks are created and `early_stopping` is enabled for the table (with valid threshold and interval), the orchestrator pushes an internal **poll task** that periodically checks how many mismatch rows have already been ingested into the Snowflake results table.
2. On each poll tick the orchestrator counts rows in the appropriate results table (`ROW_VALIDATION_RESULTS` for row and hybrid modes, `CELL_VALIDATION_RESULTS` for cell mode).
3. If the count **reaches or exceeds** `early_stop_mismatch_row_threshold`, all pending L3 partition tasks for the table are bulk-completed (skipped) so the workflow can converge without processing every partition.
4. If the count is still below the threshold, the poll reschedules itself after `early_stop_check_interval_minutes` minutes and checks again.
5. Polling also stops automatically when the L3 anchor task (the task that all partitions feed into) reaches a terminal state — meaning all partitions completed normally before the threshold was reached.

#### When to use it

- **Large tables with many partitions** where you expect mismatches: early stopping avoids running thousands of partition comparisons when the first few already confirm a data discrepancy.
- **Hybrid mode** (`row_validation_mode = "hybrid"`): early stopping is **mandatory**. The configuration parser rejects hybrid mode without it because the two-phase design (row-hash then cell drill-down) relies on the early-stop mechanism to converge the row-hash phase.

#### Configuration

Set these three fields together in `validation_configuration` (globally or per table):

```json
"validation_configuration": {
  "row_validation": true,
  "row_validation_mode": "row",
  "early_stopping": true,
  "early_stop_mismatch_row_threshold": 500,
  "early_stop_check_interval_minutes": 2
}
```

| Field | Description |
|-------|-------------|
| `early_stopping` | Master switch. Set to `true` to enable. |
| `early_stop_mismatch_row_threshold` | Number of ingested mismatch rows that triggers the bulk skip. Choose a value that represents "enough evidence" that the table has discrepancies — lower values stop faster but report fewer details. |
| `early_stop_check_interval_minutes` | How often the orchestrator checks the mismatch count. Shorter intervals react faster but add more metadata queries. |

All three fields are required when `early_stopping` is `true`. If the threshold or interval is missing, the workflow creation fails with a configuration error.

#### Inheritance

Early stopping fields follow the same inheritance rules as other `validation_configuration` properties: per-table values override global values. The orchestrator resolves the effective values from the first source (table-level, then global-level) that provides them.

#### Interaction with hybrid mode

For hybrid mode (`row_validation_mode = "hybrid"`), the mismatch threshold counts **row-hash** mismatches only (rows in `ROW_VALIDATION_RESULTS`). The cell drill-down phase that follows is not counted toward the threshold. Once the row-hash phase converges — either because all partitions finished or because early stopping triggered — the orchestrator evaluates which partitions had mismatches and runs targeted cell comparisons only for those.

## Advanced Features

### Redshift UNLOAD

For **Redshift**, it is recommended to use the **UNLOAD** extraction strategy. The main idea behind this is:

- Large query results are written directly to an **S3 Bucket** instead of being downloaded to the machine in which the **Worker** is running.
- On **Snowflake** side, an [External Stage](https://docs.snowflake.com/en/user-guide/data-load-s3-create-stage) is set up to reference the corresponding **S3 Bucket**, so that **COPY INTO** statements can be done directly from that stage.

See the [Extraction Strategy](#extractionstrategy) section for configuration details.

### Incremental Synchronization

It is possible to migrate some tables and then re-migrate them in the future, moving only the data that has changed. See the [Synchronization Strategies](#synchronizationstrategy) section for the available strategies and their configuration.

## Query Tagging

Both the Orchestrator and the Worker automatically set Snowflake's [`QUERY_TAG`](https://docs.snowflake.com/en/sql-reference/parameters#query-tag) session parameter on every query they submit. Tags are compact JSON strings containing identifiers such as the workflow ID, task ID, and component version. You can use these tags to filter and attribute queries in [`QUERY_HISTORY`](https://docs.snowflake.com/en/sql-reference/account-usage/query_history):

```sql
SELECT query_text, query_tag, start_time
FROM SNOWFLAKE.ACCOUNT_USAGE.QUERY_HISTORY
WHERE TRY_PARSE_JSON(query_tag):DMVF_WORKFLOW_ID IS NOT NULL
ORDER BY start_time DESC;
```

| Tag key | Present on | Description |
|---------|-----------|-------------|
| `DMVF_VERSION` | Infrastructure queries | Component package version. |
| `DMVF_WORKFLOW_ID` | Task-processing queries | Workflow that originated the task. |
| `DMVF_TASK_ID` | Task-processing queries | Individual task identifier. |
| `DMVF_ORCHESTRATOR_VERSION` | Orchestrator task-processing queries | Orchestrator package version. |
| `DMVF_WORKER_VERSION` | Worker task-processing queries | Worker package version. |

## Considerations and Recommendations

### Connecting to Snowflake with a PAT

It is recommended to use [Programmatic Access Tokens](https://docs.snowflake.com/en/user-guide/programmatic-access-tokens) for connections used by the Orchestrator and **Workers**. This ensures there won't be a need to constantly authenticate through the browser or with an Authenticator app. You will need to establish a [Network Policy](https://docs.snowflake.com/en/user-guide/network-policies) or temporarily bypass the requirement for a Network Policy (this can be done from **Snowsight**).

### Running Orchestrator and/or Workers on SPCS

If you want to leverage **Snowflake** compute for these tasks, you can:

1. Prepare **Docker** images that use the Python modules and have the appropriate configuration.
2. Push those Docker images to an [Image Repository](https://docs.snowflake.com/en/developer-guide/snowpark-container-services/working-with-registry-repository) in Snowflake.
3. Execute the **Orchestrator** and/or **Worker(s)** images using [Snowpark Container Services](https://docs.snowflake.com/en/developer-guide/snowpark-container-services/overview).

Some considerations:

- It is recommended to execute them as **Services**, not **Jobs**.
- It is possible to run only one component (Orchestrator or Workers) in SPCS and the other on another platform.
- It is a good practice to monitor the SPCS service and suspend it when it is not being used.
- Depending on the network configuration of the **Source System**, you might need to configure an [External Access Integration](https://docs.snowflake.com/en/sql-reference/sql/create-external-access-integration) so that these services can connect to your Source System.

### Initial Testing

It is recommended to deploy the DDL for the tables you want to migrate before starting data migration. This ensures the target types match the behavior you want in those tables and their related views/procedures. Converting the DDL from your source dialect into **Snowflake SQL** can be done through the Code Conversion capabilities of [SnowConvert AI](https://docs.snowflake.com/en/migrations/snowconvert-docs/overview) and/or [Cortex Code](https://docs.snowflake.com/en/user-guide/cortex-code/cortex-code). If you don't deploy the DDL before starting data migration, the types will be inferred and might not be as accurate as desired.

Additionally, it is a good practice to move a few rows from each table as a test before starting the full migration. This helps detect configuration or connectivity issues early.

### Managing Workers

The time it takes to complete a workflow depends heavily on many variables. One of the variables that affects the most is the number of workers (and the number of threads per worker), since that determines how many extraction tasks can be executed in parallel. Consider:

1. It is not necessary to run two workers on the same machine. If you want more parallelism on one machine, increase the thread count instead.
2. Network bandwidth greatly affects the speed of workers and is effectively shared between threads of a worker.
3. Even with many workers/threads processing tasks in parallel, your source system might not have enough resources to handle the load.
4. You might want to keep a low worker count to avoid overloading your source system.
5. You might want to stop some (or all) of your workers at times when your source system is already overloaded by unrelated operations, to avoid disrupting those operations.

## Workflow Configuration Reference

The workflow configuration file is a JSON object. Its structure is described below using named models -- each model's properties reference other models by name.

### WorkflowConfiguration (Top Level)

| Property | Type | Required | Description |
|----------|------|----------|-------------|
| `schemaVersion` | String | No | Version of the configuration schema (e.g. "1.0.0"). Accepts formats "major", "major.minor", or "major.minor.patch". Defaults to "1.0.0" if omitted. |
| `tables` | [TableConfiguration](#tableconfiguration)[] | Yes | An array of table-specific configurations defining which tables to migrate and how. |
| `defaultTableConfiguration` | [TableConfiguration](#tableconfiguration) | No | Shared settings inherited by all tables. Table-specific values override these defaults (see [merging rules](#default-table-configuration-merging-rules) below). |

### TableConfiguration

Defines the settings for migrating a single table.

| Property | Type | Required | Description |
|----------|------|----------|-------------|
| `source` | [SourceTargetIdentifier](#sourcetargetidentifier) | Yes | Identifies the source table. |
| `target` | [SourceTargetIdentifier](#sourcetargetidentifier) | Yes | Identifies the target table in Snowflake. |
| `columnNamesToPartitionBy` | String[] | No | Columns used to partition data during extraction. When omitted or empty, the table is extracted as a single unit (recommended only for very small tables). |
| `extraction` | [ExtractionStrategy](#extractionstrategy) | No | Configures how data is extracted from the source database. |
| `synchronization` | [SynchronizationStrategy](#synchronizationstrategy) | No | Configures incremental synchronization behavior. |
| `columnTypeMappings` | [ColumnTypeMapping](#columntypemapping)[] | No | Type conversions applied during migration. |
| `columnNameMappings` | [ColumnNameMapping](#columnnamemapping)[] | No | Column renaming mappings. |
| `primaryKeyColumns` | String[] | No | Primary key columns for the source table. Required when using `trackModifications` in the watermark synchronization strategy. |
| `targetPartitionSizeMb` | Integer | No | Target partition size in MB. **Mutually exclusive** with `targetPartitionSizeRows`. Must be greater than 0 when set. When both `targetPartitionSizeMb` and `targetPartitionSizeRows` are omitted, the orchestrator picks sizes automatically (**auto** mode). See [Partition Size](#partition-size). |
| `targetPartitionSizeRows` | Integer | No | Target partition size in rows. **Mutually exclusive** with `targetPartitionSizeMb`. Must be greater than 0 when set. When both `targetPartitionSizeMb` and `targetPartitionSizeRows` are omitted, the orchestrator picks sizes automatically (**auto** mode). See [Partition Size](#partition-size). |
| `whereClauseCriteria` | String | No | SQL-like filter to select a subset of rows (e.g., `"is_deleted = 0"`). |
| `loadSegmentation` | [LoadSegmentation](#loadsegmentation) | No | Splits a single COPY INTO into multiple parallel statements, each targeting a subset of staged files. |
| `loading` | Object | No | **(Experimental)** Loading strategy override. Accepts `{ "strategy": "snowpipe" }` to use Snowpipe for ingestion instead of the default warehouse-based `COPY INTO`. Behavior and configuration may change in future releases. |

#### Default Table Configuration Merging Rules

When `defaultTableConfiguration` is provided, its values are merged into each table entry using these rules:

- **Nested objects** (`source`, `target`, `synchronization`, `extraction`): Deep merge -- fields within are merged individually.
- **Collections** (`columnTypeMappings`, `columnNameMappings`, etc.): Table value replaces default entirely.
- **Scalars** (`whereClauseCriteria`): Table value overrides default.

### SourceTargetIdentifier

Used by `source` and `target` in [TableConfiguration](#tableconfiguration) to identify a database object.

| Property | Type | Required | Description |
|----------|------|----------|-------------|
| `databaseName` | String | Yes | Name of the database. |
| `schemaName` | String | Yes | Name of the schema. |
| `tableName` | String | Yes | Name of the table. |

The `target` object accepts two additional optional fields: `tableType` (`"native"` or `"iceberg"`) and `icebergConfig` (required when using Iceberg). If `tableType` is omitted or null, it defaults to **`"native"`** (standard Snowflake table). See [IcebergConfig](#icebergconfig-targeticebergconfig) and the [Redshift UNLOAD with Iceberg Tables](#redshift-unload-with-iceberg-tables) example.

### IcebergConfig (target.icebergConfig)

Used when `target.tableType` is **`"iceberg"`**. Fields are merged with `defaultTableConfiguration.target.icebergConfig`; table-level keys override defaults.

| Property | Type | Required | Description |
|----------|------|----------|-------------|
| `catalog` | String | No | Default **`SNOWFLAKE`** for Snowflake-managed Iceberg. Use a **catalog integration** name for externally cataloged tables (for example AWS Glue). |
| `externalVolume` | String | For `catalog` **`SNOWFLAKE`** | Snowflake external volume for Iceberg data and metadata. |
| `baseLocationPrefix` | String | No | Optional path prefix for `BASE_LOCATION` when using Snowflake-managed Iceberg (`catalog` **`SNOWFLAKE`**). |
| `catalogTableName` | String | For external `catalog` | Fully qualified name of the table in the external catalog (for example `glue_db.my_table`). |
| `catalogSync` | String | No | Optional catalog integration used to sync Snowflake-managed metadata back to an external catalog. |
| `sourceDataStage` | String | No | Stage path starting with **`@`** pointing at existing Parquet files; used for **`copy_files`**-style loads with Snowflake-managed Iceberg. |
| `migrationStrategy` | String | No | One of **`catalog_link`**, **`convert_to_managed`**, **`copy_files`**. When omitted, the orchestrator infers a strategy from `catalog` and `sourceDataStage`. |

Snowflake account setup for Iceberg (external volumes, catalog integrations, stages, and privileges) follows Snowflake’s Iceberg documentation; use the examples above as a template for JSON fields.

### ColumnTypeMapping

| Property | Type | Required | Description |
|----------|------|----------|-------------|
| `sourceType` | String | Yes | Type name in the source system. |
| `targetType` | String | Yes | Target type in Snowflake. |

### ColumnNameMapping

| Property | Type | Required | Description |
|----------|------|----------|-------------|
| `sourceName` | String | Yes | Column name in the source system. |
| `targetName` | String | Yes | Target column name in Snowflake. |

### ExtractionStrategy

| Field | Type | Required | Description |
|-------|------|----------|-------------|
| `strategy` | `"regular"`, `"unload"`, `"write_nos"`, or `"tpt"` | Yes | Extraction method. `"regular"` is the default; `"unload"` is for Redshift; `"write_nos"` and `"tpt"` are for Teradata. |
| `externalStage` | String | UNLOAD / WRITE_NOS only | Fully-qualified Snowflake external stage name (e.g., `"MY_DB.MY_SCHEMA.S3_STAGE"`). |

**regular** (default) -- Data is queried and downloaded through the Worker:

```json
"extraction": { "strategy": "regular" }
```

**unload** (Redshift only) -- Data is written to S3 via Redshift UNLOAD and loaded from an external stage:

```json
"extraction": { "strategy": "unload", "externalStage": "MY_DB.MY_SCHEMA.S3_EXTERNAL_STAGE" }
```

**write_nos** (Teradata only) -- Data is written directly to cloud object storage (S3, Azure Blob, or GCS) via the Teradata `WRITE_NOS` table function, then loaded from an external stage that points at the same location. Requires the worker to have `write_nos_*` settings under `[connections.source.teradata]`:

```json
"extraction": { "strategy": "write_nos", "externalStage": "MY_DB.MY_SCHEMA.TD_NOS_STAGE" }
```

**tpt** (Teradata only) -- The worker runs Teradata Parallel Transporter (`tbuild`) to export delimited data, converts it to Parquet locally, then uploads to the Snowflake internal stage (same path convention as `"regular"`). Does **not** use `externalStage`. Requires TTU on the worker and `tpt_*` settings under `[connections.source.teradata]`:

```json
"extraction": { "strategy": "tpt" }
```

### Partition Size

Controls how large each partition should be during extraction. Configured at the [TableConfiguration](#tableconfiguration) level via two flat, mutually exclusive fields: `targetPartitionSizeMb` or `targetPartitionSizeRows`. When both are omitted, the system uses **auto** sizing.

| Form | Description |
|------|-------------|
| Both omitted (default) | **Auto**. The system picks optimal partition sizes based on the source platform, extraction strategy, and table size. |
| `"targetPartitionSizeMb": N` | Each partition targets approximately `N` megabytes of data. Must be greater than 0. |
| `"targetPartitionSizeRows": N` | Each partition targets `N` rows, regardless of data size. Must be greater than 0. |

Only one of `targetPartitionSizeMb` or `targetPartitionSizeRows` may be specified for a given table (setting both is a configuration error).

**Auto** (default) -- Omit both fields. The system selects partition sizes tuned for the platform and extraction strategy. Auto mode uses larger partitions for Redshift UNLOAD (where S3 handles large files well) and smaller partitions for ODBC-based extraction (SQL Server, Redshift REGULAR) where data flows through the Worker's memory.

**Fixed size in MB** -- Specify a target size per partition:

```json
"targetPartitionSizeMb": 2048
```

**Fixed row count** -- Specify a target number of rows per partition:

```json
"targetPartitionSizeRows": 500000
```

### LoadSegmentation

Controls post-upload load segmentation. When a large number of files are staged for a single partition (common with Redshift UNLOAD), the orchestrator can split the COPY INTO into multiple parallel statements, each targeting a subset of files. All resulting COPY INTO tasks fan-in to the same successor task.

| Property | Type | Required | Description |
|----------|------|----------|-------------|
| `targetSegmentSizeMb` | Integer | Yes | Target total file size (in MB) per COPY INTO segment. |

When `loadSegmentation` is omitted, a single COPY INTO is used for all files in the partition (the default behavior).

```json
"loadSegmentation": { "targetSegmentSizeMb": 5000 }
```

Each segment will contain files whose total size does not exceed the target. Files larger than the target are placed in their own segment. The Snowflake `FILES` parameter limit (1,000 files) is also enforced per statement.

### SynchronizationStrategy

Controls whether subsequent workflow runs perform a full re-extraction or only sync changed data.

| Field | Type | Required | Description |
|-------|------|----------|-------------|
| `strategy` | `"none"`, `"checksum"`, or `"watermark"` | Yes | The synchronization method. |
| `watermarkColumn` | String | Watermark only | Column name to track (must be monotonically increasing). |
| `trackModifications` | Boolean | No | If `true`, uses the primary key to deduplicate modified rows. Requires `primaryKeyColumns` in [TableConfiguration](#tableconfiguration). |

**none** (default) -- Full extraction on every run. No synchronization metadata is stored.

```json
"synchronization": { "strategy": "none" }
```

- **Use when:** Data is small, changes are unpredictable, or guaranteed consistency is needed.

**checksum** -- Computes a hash of all column values per partition. Only changed partitions are cleared and re-extracted.

```json
"synchronization": { "strategy": "checksum" }
```

- **Use when:** You need to detect any change but lack a reliable monotonic column (e.g., dimension tables).
- **Trade-offs:** Requires a checksum computation on the source for every partition on every run.

**watermark** -- Tracks a monotonic column (timestamp, ID, version) to sync only rows newer than the last observed maximum.

```json
"synchronization": { "strategy": "watermark", "watermarkColumn": "UPDATED_AT" }
```

- **Use when:** Your table has a reliable monotonic column that increases on insert/update (e.g., fact tables, event logs).
- **Limitation:** Watermark alone cannot currently track deletions. Support for this will be added in the future.

### Quoting Identifiers

Names that need quoting (or brackets) must be manually quoted as they would normally be in JSON. For example: `"tableName": "\"MyCaseSensitiveTable\""`.

## Workflow Configuration Examples

### Basic Migration (SQL Server)

Migrates two tables with shared source/target schemas, type mappings, column renaming, watermark sync, and row filtering:

```json
{
  "defaultTableConfiguration": {
    "source": {
      "schemaName": "data_migration_cloud_test",
      "databaseName": "SampleStoreDB"
    },
    "target": {
      "schemaName": "data_migration_cloud_test",
      "databaseName": "samplestoredb"
    }
  },
  "tables": [
    {
      "source": { "tableName": "store_employee" },
      "target": { "tableName": "target_employee" },
      "columnNamesToPartitionBy": ["ID"]
    },
    {
      "source": { "tableName": "Sales_Simple" },
      "target": { "tableName": "Sales_Simple" },
      "columnNamesToPartitionBy": ["ID"],
      "columnTypeMappings": [
        { "sourceType": "MONEY", "targetType": "DECIMAL(19,4)" }
      ],
      "columnNameMappings": [
        { "sourceName": "id", "targetName": "old_id" },
        { "sourceName": "name", "targetName": "full_name" }
      ],
      "synchronization": {
        "strategy": "watermark",
        "watermarkColumn": "UPDATED_AT"
      },
      "targetPartitionSizeMb": 2048,
      "whereClauseCriteria": "is_deleted = 0"
    }
  ]
}
```

### Redshift UNLOAD

Uses the UNLOAD extraction strategy with an external stage for S3-based data transfer:

```json
{
  "defaultTableConfiguration": {
    "source": {
      "schemaName": "ecommerce_raw",
      "databaseName": "snowconvert_demo"
    },
    "target": {
      "schemaName": "ecommerce_raw",
      "databaseName": "TARGET_DB"
    },
    "extraction": {
      "strategy": "unload",
      "externalStage": "MY_DB.MY_SCHEMA.S3_EXTERNAL_STAGE"
    },
    "loadSegmentation": { "targetSegmentSizeMb": 5000 }
  },
  "tables": [
    {
      "source": { "tableName": "customers" },
      "target": { "tableName": "customers" },
      "columnNamesToPartitionBy": ["customer_id"]
    },
    {
      "source": { "tableName": "orders" },
      "target": { "tableName": "orders" },
      "columnNamesToPartitionBy": ["order_id"],
      "columnTypeMappings": [
        { "sourceType": "NUMERIC(10,2)", "targetType": "DECIMAL(10,2)" }
      ]
    }
  ]
}
```

### Redshift UNLOAD with Iceberg Tables

Combines Redshift UNLOAD with Iceberg table targets, including Snowflake-managed and Glue catalog configurations:

```json
{
  "defaultTableConfiguration": {
    "source": {
      "schemaName": "public",
      "databaseName": "analytics_db"
    },
    "target": {
      "schemaName": "public",
      "databaseName": "TARGET_DB",
      "tableType": "iceberg",
      "icebergConfig": {
        "catalog": "SNOWFLAKE",
        "externalVolume": "my_iceberg_ext_vol",
        "baseLocationPrefix": "migrations/redshift",
        "sourceDataStage": "@TARGET_DB.PUBLIC.ICEBERG_SOURCE_STAGE"
      }
    },
    "extraction": {
      "strategy": "unload",
      "externalStage": "TARGET_DB.PUBLIC.S3_EXTERNAL_STAGE"
    }
  },
  "tables": [
    {
      "source": { "tableName": "customers" },
      "target": { "tableName": "customers" },
      "columnNamesToPartitionBy": ["customer_id"]
    },
    {
      "source": { "tableName": "events" },
      "target": {
        "tableName": "events",
        "tableType": "iceberg",
        "icebergConfig": {
          "catalog": "my_glue_catalog_integration",
          "externalVolume": "my_iceberg_ext_vol",
          "catalogTableName": "glue_db.events"
        }
      },
      "columnNamesToPartitionBy": ["event_id"]
    },
    {
      "source": { "tableName": "orders" },
      "target": {
        "tableName": "orders",
        "tableType": "iceberg",
        "icebergConfig": {
          "catalog": "my_glue_catalog_integration",
          "externalVolume": "my_iceberg_ext_vol",
          "catalogTableName": "glue_db.orders",
          "migrationStrategy": "convert_to_managed"
        }
      },
      "columnNamesToPartitionBy": ["order_id"]
    }
  ]
}
```

### Incremental Sync with Watermark

Uses watermark-based synchronization with modification tracking for incremental data migration:

```json
{
  "defaultTableConfiguration": {
    "source": { "databaseName": "SRC", "schemaName": "dbo" },
    "target": { "databaseName": "TGT", "schemaName": "public" },
    "synchronization": {
      "strategy": "watermark",
      "watermarkColumn": "updated_at"
    }
  },
  "tables": [
    {
      "source": { "tableName": "orders" },
      "target": { "tableName": "orders" },
      "columnNamesToPartitionBy": ["order_id"],
      "primaryKeyColumns": ["order_id"],
      "synchronization": {
        "trackModifications": true
      }
    }
  ]
}
```

### Early stopping (Data Validation)

Enables L3 early stopping globally, with a per-table override using a lower threshold for a known-problematic table. The orchestrator will stop processing remaining partitions for a table once the configured number of mismatches is detected:

```json
{
  "source_platform": "Teradata",
  "target_platform": "Snowflake",
  "validation_configuration": {
    "schema_validation": true,
    "metrics_validation": true,
    "row_validation": true,
    "row_validation_mode": "row",
    "continue_on_failure": true,
    "early_stopping": true,
    "early_stop_mismatch_row_threshold": 1000,
    "early_stop_check_interval_minutes": 5
  },
  "tables": [
    {
      "fully_qualified_name": "my_database.large_fact_table",
      "target_database": "MY_DATABASE",
      "target_schema": "PUBLIC",
      "target_name": "LARGE_FACT_TABLE",
      "column_names_to_partition_by": ["ID"],
      "target_partition_size_mb": 200
    },
    {
      "fully_qualified_name": "my_database.known_problematic_table",
      "target_database": "MY_DATABASE",
      "target_schema": "PUBLIC",
      "target_name": "KNOWN_PROBLEMATIC_TABLE",
      "column_names_to_partition_by": ["ORDER_ID"],
      "validation_configuration": {
        "early_stop_mismatch_row_threshold": 100,
        "early_stop_check_interval_minutes": 1
      }
    }
  ]
}
```

### Hybrid validation with early stopping (Data Validation)

Hybrid mode requires early stopping. The row-hash phase detects mismatched partitions quickly, and once enough are found the remaining partitions are skipped. Only mismatched partitions proceed to cell-level drill-down:

```json
{
  "source_platform": "Teradata",
  "target_platform": "Snowflake",
  "validation_configuration": {
    "schema_validation": true,
    "metrics_validation": true,
    "row_validation": true,
    "row_validation_mode": "hybrid",
    "continue_on_failure": true,
    "early_stopping": true,
    "early_stop_mismatch_row_threshold": 500,
    "early_stop_check_interval_minutes": 2
  },
  "tables": [
    {
      "fully_qualified_name": "my_database.sales_transactions",
      "target_database": "MY_DATABASE",
      "target_schema": "PUBLIC",
      "target_name": "SALES_TRANSACTIONS",
      "column_names_to_partition_by": ["TRANSACTION_ID"],
      "target_partition_size_mb": 200
    }
  ]
}
```

### View validation (Teradata and Redshift)

Validates a source **view** against its Snowflake counterpart using the top-level `views` array (or `object_type: "VIEW"` on a table entry). **Teradata** uses basic L1 schema (existence + datatype) via `HELP COLUMN`. **Redshift** uses full L1 schema via `SVV_COLUMNS`, the same path as tables. **PostgreSQL** uses full L1 schema via `information_schema` / `pg_catalog`, the same path as tables. Cloud DV never CTAS-materializes views on Teradata, Redshift, or PostgreSQL.

For the local **`snowflake-data-validation`** CLI, Teradata, Redshift, and PostgreSQL also skip view materialization so runs match cloud behavior; SQL Server and other sources still materialize views to temporary tables before validation.

The example below is Teradata-specific (`source_platform`, base table + view names). For Redshift, set `source_platform` to `"Redshift"`, use Redshift-style three-part names, and configure the worker’s `[connections.source.redshift]` block as in [Worker Configuration](#worker-configuration). For PostgreSQL, set `source_platform` to `"postgresql"` (or `"postgres"`), use PostgreSQL-style identifiers, and configure `[connections.source.postgresql]`.

#### Teradata example

Validates a Teradata view against its Snowflake counterpart. The `views` array tags entries as `object_type = "VIEW"` automatically. The `tables` entry below is included with all validation disabled — this is useful when the underlying table must exist in the workflow but only the view needs to be validated:

```json
{
  "source_platform": "Teradata",
  "target_platform": "Snowflake",
  "validation_configuration": {
    "schema_validation": true,
    "metrics_validation": true,
    "row_validation": true,
    "row_validation_mode": "cell",
    "continue_on_failure": true,
    "max_failed_rows_number": 100
  },
  "comparison_configuration": {
    "tolerance": 0.001
  },
  "tables": [
    {
      "fully_qualified_name": "my_database.base_table",
      "target_database": "MY_DATABASE",
      "target_schema": "PUBLIC",
      "target_name": "BASE_TABLE",
      "validation_configuration": {
        "schema_validation": false,
        "metrics_validation": false,
        "row_validation": false
      }
    }
  ],
  "views": [
    {
      "fully_qualified_name": "my_database.sales_summary_view",
      "target_database": "MY_DATABASE",
      "target_schema": "PUBLIC",
      "target_name": "SALES_SUMMARY_VIEW",
      "index_column_list": ["ID"],
      "target_index_column_list": ["ID"],
      "partition_column": "ID",
      "target_rows_per_partition": 50000
    }
  ]
}
```

## Worker Configuration

This section documents the configuration for the Worker (`snowflake-data-exchange-agent` package). The Worker configuration file uses [TOML](https://toml.io/) format.

| Section | Property | Type | Description |
|---------|----------|------|-------------|
| **Top Level** | `selected_task_source` | String | Currently should always be set to `"snowflake_stored_procedure"`. |
| `[application]` | `max_parallel_tasks` | Integer | Maximum number of tasks the worker will process in parallel (using threads). |
| `[application]` | `task_fetch_interval` | Integer | Interval (in seconds) between attempts to fetch new tasks from the Orchestrator. |
| `[application]` | `snowflake_database_for_metadata` | String | Optional. Database where the orchestrator deployed the task queue (default `SNOWCONVERT_AI`). Must match the orchestrator's `CUSTOM_SNOWFLAKE_DATABASE_FOR_METADATA` if you override it there. |
| `[application]` | `snowflake_schema_for_data_migration_metadata` | String | Optional. Schema for `PULL_TASKS` / `COMPLETE_TASK` / `FAIL_TASK` (default `DATA_MIGRATION`). Must match the orchestrator's `CUSTOM_SNOWFLAKE_SCHEMA_FOR_DATA_MIGRATION_METADATA` if overridden. |
| `[application]` | `local_results_directory` | String | Optional. Base directory where exported Parquet/CSV files are written before upload. Defaults to `~/.data_exchange_agent/result_data`. |
| `[connections.source.*]` | | Object | Configuration for source system connections. The Worker typically requires an ODBC driver. See examples below. |
| `[connections.target.snowflake_connection_name]` | `connection_name` | String | The name of the connection entry in the `~/.snowflake/config.toml` file to use. |

When `selected_task_source` is `snowflake_stored_procedure`, the worker calls the task-queue procedures using `application.snowflake_database_for_metadata` and `application.snowflake_schema_for_data_migration_metadata`. These values are independent of Snowflake session defaults (`SNOWFLAKE_DATABASE`, `SNOWFLAKE_SCHEMA`) in the connection profile.

**Example: SQL Server (Standard Authentication)**

```toml
[connections.source.sqlserver]
username = "username"
password = "password"
database = "database_name"
host = "127.0.0.1"
port = 1433
```

**Example: Amazon Redshift (IAM Authentication)**

```toml
[connections.source.redshift]
username = "demo-user"
database = "demo_db"
auth_method = "iam-provisioned-cluster"
cluster_id = "my-aws-cluster"
region = "us-west-2"
access_key_id = "your-access-key-id"
secret_access_key = "your-secret-access-key"
```

**Example: Amazon Redshift (Standard Authentication)**

```toml
[connections.source.redshift]
username = "myuser"
password = "mypassword"
database = "mydatabase"
host = "my-cluster.abcdef123456.us-west-2.redshift.amazonaws.com"
port = 5439
auth_method = "standard"
```

**Example: Teradata**

The Worker supports two Teradata drivers and automatically selects the best one available. The pure Python [`teradatasql`](https://pypi.org/project/teradatasql/) driver is preferred; install it with `pip install snowflake-data-exchange-agent[teradata]`. If it is not installed, the Worker falls back to `pyodbc` with the Teradata ODBC driver.

```toml
[connections.source.teradata]
host = "your-teradata-host.example.com"
port = 1025
database = "tpcds"
username = "your_username"
password = "your_password"
# odbc_driver = "Teradata Database ODBC Driver 17.20"  # only needed when teradatasql is not installed (ODBC fallback)
# dbc_name = "TDPID_ALIAS"  # optional; defaults to host
# authentication = "LDAP"  # optional ODBC AuthMech when using pyodbc (e.g. TD2, LDAP, KRB5)
```

> **Note:** Only one source connection is needed. The Snowflake target connection should point to a valid entry in your `~/.snowflake/config.toml`.

## Changelog

### v0.10.0

**New features**

- Added view validation support to the Cloud Data Validation pipeline.
- Added `DATA_MIGRATION_WORKFLOW` and `DATA_VALIDATION_WORKFLOW` views with workflow-type filtering and per-workflow L1/L2/L3 progress rollups.
- Added Oracle ODBC source support.
- Added Teradata type mappings and Snowflake target fully-qualified-name helpers.
- Added a Teradata orchestrator platform module.
- Added Teradata `object_type` query in the shared dispatcher.
- Added Teradata to the data-migration-orchestrator workflow.
- Added Teradata ODBC support to the data-exchange-agent.
- Added Oracle source platform with `ALL_` schema discovery, type mappings, and preprocessing hooks.
- Added early-stopping support for L3 row-hashing validation.
- Added support for custom metrics and templates in Cloud Data Validation.
- Added Oracle Data Validation foundation with L1 schema validation.
- Added Oracle catalog type mappings and strict `require_type_mapping` enforcement.
- Added L3 row and cell MD5 validation for Oracle.
- Added the `teradata` optional install extra (`pip install snowflake-data-exchange-agent[teradata]`).
- Added support for early stopping, hybrid L3, and Snowpipe (breaking change).
- Added per-table progress percentage, elapsed time, and ETA to the Data Validation dashboard.
- Added Snowflake schema utilities and type-mapping updates for the orchestrator.
- Added TPT and `WRITE_NOS` data sources for Teradata extraction.
- Added extraction-stage validation and workflow configuration.
- Added TPT and `WRITE_NOS` integration in Teradata workflow tasks.
- Added a metrics skill and PostgreSQL metrics templates.
- Added PostgreSQL connector with L0 and L1 validation.
- Added Redshift view validation.
- Added Oracle as a supported Data Validation source across the orchestrator and agent.
- Added L2 and L3 row and cell validation for PostgreSQL.
- **Cloud Data Validation**: PostgreSQL is now a supported source platform (`postgresql`, `postgres`, `pg`). The orchestrator and DEA resolve SDV `Platform.POSTGRESQL` and templates under `postgresql/`; hybrid and evaluate handlers fold schema column names to lowercase like Redshift. Local SDV skips CTAS view materialization for PostgreSQL (with Teradata, Redshift, and Oracle) so CLI view runs align with cloud DV. View entries still use the top-level `views` array or `object_type: "VIEW"` (see [View validation](#view-validation-teradata-and-redshift)); Teradata views remain basic L1 via `HELP COLUMN`.

**Improvements**

- Optimized L3 row-hashing queries.
- Extended Teradata ODBC connection configuration.

**Bug fixes**

- Deduplicated Redshift catalog rows in the planner for `DISTSTYLE=ALL` and `AUTO(SORTKEY)` cases.
- Prevented out-of-memory errors in cell-by-cell and row-hashing comparisons in workers.
- Fixed L3 row-hashing producing false positives.
- Prevented unnecessary shared-cache eviction when the loaded copy already matches the workspace.
- Fixed row-hashing algorithm errors and now surfaces duplicates and missing rows distinctly (breaking change).

### v0.9.1

**Improvements**

- Improved Cloud Data Validation column metrics performance by consolidating per-column CTEs into a single wide-row query.

**Bug fixes**

- Fixed aggregate overflow on `STDDEV` and `VARIANCE` in Cloud Data Validation by casting `SUM`/`AVG`/`STDDEV` inputs to `FLOAT`; removed the `VARIANCE` metric.
- Fixed migration 0015 failing on Snowflake accounts that do not support `ALTER DATABASE ... SET EVENT_TABLE`; the event table is now set only when a Snowpipe-based data migration starts.

### v0.9.0

**Improvements**

- Vertical partitioning for cell validation on wide tables.
- Granular per-table L1/L2/L3 progress in the validation dashboard.
- Validated table-level `ROW_COUNT` during schema validation.
- Created Data Validation Snowpipes synchronously and restored the `task_queue` argument in the hybrid validation handler.
- Table-centric Streamlit dashboards with CSV export.

**Bug fixes**

- Fixed timestamp copy handling for SQL Server BCP loads.
- Fixed duplicate tasks created when evaluating L1 results under race conditions.
- Fixed decimal partition coercion and parallelized L3 validation fixes.
- Fixed orchestrator logging to `stderr` instead of `stdout`.

### v0.8.0

**New features**

- Added hybrid row validation mode — two-phase `MD5` + cell drilldown.
- Added support for smart partitioning in Cloud Data Validation.
- Added load segmentation for multi-file `COPY INTO`.
- Added `DEFAULT` normalization templates for various data types.

**Improvements**

- Improved result set snapshots validation.
- Added a Table Progress tab to the Streamlit dashboard for Data Validation.
- Improved Data Validation performance.
- Included thread name and ID in log output for easier troubleshooting.
- Improved the task queue to support a higher number of parallel workers.

**Bug fixes**

- Cloud Data Validation now defaults to `False` when null in the workflow config.
- Fixed `SQL` compilation memory exhaustion by batching L2 metrics queries for wide tables.
- Fixed cell validation key mismatch, partition `WHERE` clause handling, and metrics payload cleanup.
- Fixed duplicate rows caused by an orchestrator crash during `COPY INTO`.
- Fixed an issue with the incremental sync watermark on Redshift.
- Fixed usage of the vectorized scanner.

### v0.7.2

**Improvements**

- Log package and Python versions when the orchestrator starts.

**Bug fixes**

- Snowflake: recover cleanly when a session expires instead of staying stuck in a bad state.
- Task queue: avoid unblocking tasks before predecessor rows exist (stored procedures and schema migration).
- Improve generation of data validation queries for Teradata: handle very large counts and optional `WHERE` filters correctly.

### v0.7.1

**Improvements**

- Cloud Data Validation dashboard: paginate large tab views for easier browsing.
- Cloud validation: load large query results in smaller batches for the dashboard and query helpers.

**Bug fixes**

- Data migration partition planning no longer caps the number of partitions with a fixed maximum.
- Snowflake connectivity: more resilient sessions and automatic retry for transient failures.
