CLI Reference v0.1.0

datapilot CLI - dp

A command-line tool for ingesting, profiling, preprocessing, and exporting data across PostgreSQL, MySQL, S3, Kafka, REST APIs, and local files.

Python ≥ 3.11 Polars · PyArrow asyncio-native entry: dp

Installation

Install the package in editable mode from the project root. The dp entry point is registered automatically via pyproject.toml.

bash
# Install with dev dependencies
pip install -e ".[dev]"

# Verify
dp --help

Quick Start

The minimal path from a CSV file to a profiled, preprocessed, exported dataset.

  1. Ingest a local CSV file into the artifact store.
  2. Classify columns to understand the semantic shape of the dataset.
  3. Profile the ingested dataset to understand its structure.
  4. Apply preprocessing transformations.
  5. Export the result to Parquet or write back to a database.
bash
dp ingest --source local_file --path data.csv
# -> Run ID:  abc123
# -> Artifact: abc123_ingest_output

dp classify --input abc123 --mode hybrid

dp profile --input abc123

dp preprocess --input abc123 --pipeline pipeline.json
# -> Run ID:  def456
# -> Output:  def456_preprocess_output

dp export --input def456 --format parquet --out-path output/result.parquet

Artifact System

Every pipeline command saves its outputs as artifacts - files stored in the artifact store directory (default: src/datapilot/artifacts). Understanding artifacts is essential for chaining commands together.

Run ID vs Artifact ID

Each pipeline run generates a short run ID (e.g. f1662de6). The run produces one or more artifacts, each named <run_id>_<suffix>.

Example - preprocess run
Run ID f1662de6
Output f1662de6_preprocess_output ← data
Config f1662de6_preprocess_config ← step config snapshot

Passing input to commands

Any command that takes --input accepts either the run ID or a full artifact ID. When you pass a run ID, the system picks the most relevant artifact automatically based on the target command.

What you passHow it resolves
run_id only f1662de6Selects the best artifact for the next command - e.g. profile and classify prefer ingest_output, export prefers preprocess_output
full artifact ID f1662de6_ingest_outputUses that exact artifact, no resolution needed
Use dp list to see all artifacts in the store along with their run IDs, features, and creation timestamps.

Artifact types by feature

FeatureArtifacts producedFormat
ingest<run_id>_ingest_output, <run_id>_ingest_schemaParquet, JSON
profile<profile_id>_detail, <profile_id>_summaryJSON
preprocess<run_id>_preprocess_output, <run_id>_preprocess_configParquet, JSON
classify<run_id>_classify_outputJSON

Docker Services

All external services (PostgreSQL, MySQL, MinIO, Kafka) are defined in docker-compose.test.yml.

bash
# Start all services
docker compose -f docker-compose.test.yml up -d

# Check health status
docker compose -f docker-compose.test.yml ps
ServiceImagePortCredentials
postgrespostgres:16-alpine5433testuser / testpass / testdb
mysqlmysql:8-debian3307testuser / testpass / testdb
miniominio/minio:latest9000 / 9001minioadmin / minioadmin
kafkaapache/kafka:3.7.09092-

Seeding Test Data

On Windows with Git Bash, use docker compose exec -T with -c or -e flags instead of heredoc. Avoid /tmp paths as Git Bash maps them to Windows temp directories.

PostgreSQL

bash
docker compose -f docker-compose.test.yml exec -T postgres \
  psql -U testuser -d testdb \
  -c "
CREATE TABLE IF NOT EXISTS orders (
    id SERIAL PRIMARY KEY,
    product_name VARCHAR(100),
    category VARCHAR(50),
    price NUMERIC(10,2),
    quantity INTEGER,
    status VARCHAR(20),
    created_at TIMESTAMP DEFAULT NOW()
);
INSERT INTO orders (product_name, category, price, quantity, status)
SELECT
    'Product_' || i,
    (ARRAY['Electronics','Clothing','Food','Books'])[1 + (i % 4)],
    round((random() * 1000)::numeric, 2),
    (random() * 100)::int,
    (ARRAY['pending','shipped','delivered','cancelled'])[1 + (i % 4)]
FROM generate_series(1, 50000) i;
"

MySQL

bash
docker compose -f docker-compose.test.yml exec -T mysql \
  mysql -u testuser -ptestpass testdb \
  -e "
CREATE TABLE IF NOT EXISTS products (
    id INT AUTO_INCREMENT PRIMARY KEY,
    name VARCHAR(100), category VARCHAR(50),
    price DECIMAL(10,2), stock INT
);
INSERT INTO products (name, category, price, stock)
SELECT CONCAT('Product_', seq),
    ELT(1+(seq%4),'Electronics','Clothing','Food','Books'),
    ROUND(RAND()*1000,2), FLOOR(RAND()*100)
FROM ( SELECT a.N+b.N*10+c.N*100+d.N*1000+1 AS seq
  FROM (SELECT 0 N UNION SELECT 1 UNION SELECT 2 UNION SELECT 3 UNION SELECT 4
        UNION SELECT 5 UNION SELECT 6 UNION SELECT 7 UNION SELECT 8 UNION SELECT 9) a,
       (SELECT 0 N UNION SELECT 1 UNION SELECT 2 UNION SELECT 3 UNION SELECT 4
        UNION SELECT 5 UNION SELECT 6 UNION SELECT 7 UNION SELECT 8 UNION SELECT 9) b,
       (SELECT 0 N UNION SELECT 1 UNION SELECT 2 UNION SELECT 3 UNION SELECT 4
        UNION SELECT 5 UNION SELECT 6 UNION SELECT 7 UNION SELECT 8 UNION SELECT 9) c,
       (SELECT 0 N UNION SELECT 1 UNION SELECT 2 UNION SELECT 3 UNION SELECT 4
        UNION SELECT 5 UNION SELECT 6 UNION SELECT 7 UNION SELECT 8 UNION SELECT 9) d
  LIMIT 10000) t;
"

MinIO - Upload a file

bash
# Uses Python (boto3) to avoid Windows path issues with mc CLI
python - << 'EOF'
import boto3
s3 = boto3.client("s3", aws_access_key_id="minioadmin",
    aws_secret_access_key="minioadmin", endpoint_url="http://localhost:9000",
    region_name="us-east-1")
s3.upload_file("tests/fixtures/data.csv", "testbucket", "input/data.csv")
print("Upload OK")
EOF

Kafka - Produce test messages

bash
# Create topic first (use // prefix on Git Bash to prevent path conversion)
docker compose -f docker-compose.test.yml exec -T kafka \
  //opt/kafka/bin/kafka-topics.sh \
  --bootstrap-server localhost:9092 \
  --create --topic test-topic \
  --partitions 1 --replication-factor 1

# Produce 100 JSON messages
python - << 'EOF'
import asyncio, json
from aiokafka import AIOKafkaProducer

async def produce():
    p = AIOKafkaProducer(bootstrap_servers="localhost:9092")
    await p.start()
    for i in range(100):
        msg = json.dumps({"id": i, "product_name": f"Product_{i}",
            "category": ["Electronics","Clothing","Food","Books"][i%4],
            "price": round(i*10.5,2), "quantity": i%50}).encode()
        await p.send("test-topic", msg)
    await p.stop()
    print("Produced 100 messages")

asyncio.run(produce())
EOF

dp connector

Interact directly with a data source without going through a full pipeline. Useful for verifying connectivity, inspecting schema, and performing one-off operations like uploads, truncates, or Kafka message production before committing to a full ingest run.

ActionSupported SourcesDescription
testallVerify connectivity and measure latency
schemaallInspect column names, types, and row estimate
uploads3, local_fileUpload a local file to S3 or copy to a local destination
downloads3Download an S3 object to a local path
lists3List objects in a bucket with optional prefix filter
execpostgresql, mysqlExecute a raw SQL statement (DELETE, UPDATE, etc.)
truncatepostgresql, mysqlTruncate a table
producekafkaSend rows from a JSON/CSV file to a Kafka topic

test

bash
dp connector test --source postgresql --config tests/fixtures/pg_config.json
dp connector test --source mysql       --config tests/fixtures/mysql_config.json
dp connector test --source s3          --config tests/fixtures/s3_config.json
dp connector test --source kafka       --config tests/fixtures/kafka_config.json
dp connector test --source rest        --config tests/fixtures/rest_config.json

schema

bash
dp connector schema --source postgresql --config tests/fixtures/pg_config.json --table orders
dp connector schema --source mysql       --config tests/fixtures/mysql_config.json --table products
dp connector schema --source s3          --config tests/fixtures/s3_config.json --url s3://testbucket/input/data.csv
dp connector schema --source local_file  --path tests/fixtures/data.csv

upload

bash
dp connector upload \
  --source   s3 \
  --config   tests/fixtures/s3_config.json \
  --src-path tests/fixtures/data.csv \
  --dest-url s3://testbucket/upload/data.csv

download

bash
dp connector download \
  --source   s3 \
  --config   tests/fixtures/s3_config.json \
  --url      s3://testbucket/input/data.csv \
  --out-path output/downloaded.csv

list

bash
dp connector list --source s3 --config tests/fixtures/s3_config.json
dp connector list --source s3 --config tests/fixtures/s3_config.json --prefix input/

exec

bash
dp connector exec \
  --source postgresql \
  --config tests/fixtures/pg_config.json \
  --sql "DELETE FROM orders WHERE status='cancelled'"

dp connector exec \
  --source mysql \
  --config tests/fixtures/mysql_config.json \
  --sql "UPDATE products SET stock=0 WHERE price > 900"

truncate

bash
dp connector truncate --source postgresql --config tests/fixtures/pg_config.json --table orders_processed
dp connector truncate --source mysql       --config tests/fixtures/mysql_config.json --table products_processed

produce

bash
dp connector produce \
  --source     kafka \
  --config     tests/fixtures/kafka_config.json \
  --topic      test-topic \
  --file       tests/fixtures/data.csv

dp connector produce \
  --source     kafka \
  --config     tests/fixtures/kafka_config.json \
  --topic      test-topic \
  --file       tests/fixtures/data.csv \
  --key-column id

dp ingest

Reads data from a connector in streaming batches and persists it as a Parquet artifact in the artifact store. Also saves a schema snapshot (column names, types, row count) as a companion JSON artifact. The resulting artifact ID is what you pass to classify, profile, preprocess, or export.

OptionDefaultDescription
--source, -srequiredConnector type: local_file | postgresql | mysql | s3 | rest | kafka
--config, -c-Path to connector JSON config file
--path-Local file path (local_file)
--table-Table name (postgresql | mysql)
--url-S3 URL, e.g. s3://bucket/key.parquet
--topic-Kafka topic name
--endpoint-REST endpoint path
--out, -osrc/datapilot/artifactsArtifact store directory
--limit, -n-Max rows to read
--batch-size50 000Rows per streaming batch
--max-records-Max messages consumed (kafka)
bash
dp ingest --source local_file --path tests/fixtures/data.csv
dp ingest --source postgresql --config tests/fixtures/pg_config.json --table orders
dp ingest --source mysql --config tests/fixtures/mysql_config.json --table products
dp ingest --source s3 --config tests/fixtures/s3_config.json --url s3://testbucket/input/data.csv
dp ingest --source rest --config tests/fixtures/rest_config.json --endpoint posts
dp ingest --source kafka --config tests/fixtures/kafka_config.json --topic test-topic --max-records 100

dp profile

Computes descriptive statistics for every column and saves two artifacts: a detailed JSON (histograms, percentiles, top values) and a lighter summary JSON (null rates, distinct counts, warnings). A column-summary table is printed to the terminal at the end of the run.

OptionDefaultDescription
--input, -irequiredRun ID or full artifact ID
--mode, -mfullfull | summary
--sample-strategynonenone | random | reservoir
--sample-size100 000Rows to sample when strategy ≠ none
--correlationpearsonpearson | spearman | none
--out, -osrc/datapilot/artifactsArtifact store directory
bash
dp profile --input 72e0548a
dp profile --input 72e0548a --mode full --sample-strategy random --sample-size 10000 --correlation pearson
dp profile --input output/result.parquet --mode summary

dp preprocess

Applies a sequence of transformation steps defined in a JSON pipeline file. Use --dry-run to validate on the first 1 000 rows before committing. Use --checkpoint to save intermediate Parquet files after each step.

OptionDefaultDescription
--input, -irequiredRun ID or full artifact ID
--pipeline, -prequiredPath to pipeline JSON config file
--dry-runfalseRun on first 1 000 rows only - no artifact saved
--checkpointfalseSave a Parquet file after each step
--out, -osrc/datapilot/artifactsArtifact store directory

Pipeline JSON format

json - tests/fixtures/preprocess_pipeline.json
{
  "steps": [
    {"type": "impute_mean",    "scope": {"columns": ["price", "quantity"]}},
    {"type": "clip_iqr",       "scope": {"columns": ["price"]}},
    {"type": "standard_scaler", "scope": {"columns": ["price", "quantity"]}}
  ]
}
bash
dp preprocess --input 72e0548a --pipeline tests/fixtures/preprocess_pipeline.json --dry-run
dp preprocess --input 72e0548a --pipeline tests/fixtures/preprocess_pipeline.json --checkpoint

dp export

Writes a dataset artifact to a local file or pushes it back to a database or S3. Supports three write modes - replace, append, and upsert.

OptionDefaultDescription
--input, -irequiredRun ID or full artifact ID
--format, -frequiredcsv | parquet | json | jsonl | excel
--out-path-Destination file path
--connector, -c-Write-back connector config JSON
--write-modereplacereplace | append | upsert
--primary-keys-Comma-separated key columns for upsert
--compression-snappy | zstd | gzip (parquet only)
--dry-runfalsePrint first 10 rows, skip write
--out, -osrc/datapilot/artifactsArtifact store directory
bash
dp export --input f1662de6 --format parquet --out-path output/result.parquet
dp export --input f1662de6 --format csv --out-path output/result.csv
dp export --input f1662de6 --format parquet \
  --connector tests/fixtures/pg_write_config.json --write-mode upsert --primary-keys id
append + primary key constraint: Using --write-mode append on a table with a primary key will fail if duplicate IDs exist. Use upsert with --primary-keys instead.

dp list

Lists artifacts stored in the artifact store, sorted by creation time (newest first).

OptionDefaultDescription
--out, -osrc/datapilot/artifactsArtifact store directory
--feature, -f-Filter: ingest | profile | preprocess | classify
--limit, -n20Max number of artifacts to display
bash
dp list
dp list --feature ingest
dp list --feature classify
dp list --feature preprocess --limit 50

dp classify new

Inspects every column in an ingested or preprocessed dataset and assigns it a semantic type - such as identifier, numerical_continuous, or categorical_nominal - along with a confidence score and the reasoning behind the decision.

Three classification modes are available. rule_based is instant and requires no additional dependencies; it uses regex patterns against column names and Polars dtype heuristics. embedding uses a local sentence-transformers model (all-MiniLM-L6-v2) to compare each column's semantic context against typed anchor phrases. hybrid (default) runs rule-based first and only falls back to the embedding model for columns that return unknown or a confidence below 0.65 - keeping the run fast while still handling ambiguous columns accurately.

The classify output is a JSON artifact. It is not Parquet and cannot be passed to preprocess or export. Use it as a reference when deciding which columns belong in numeric pipeline steps.

OptionDefaultDescription
--input, -irequiredRun ID or full artifact ID from a previous ingest or preprocess
--mode, -m hybrid rule_based - regex + dtype heuristics only (instant, no model)
embedding - semantic similarity via sentence-transformers
hybrid - rule_based first, embedding for ambiguous columns
--threshold, -t0.0Minimum confidence to keep a classification. Columns below the threshold are recorded as unknown. Range: 0.0 – 1.0
--overrides-JSON string to manually pin columns to a semantic type, bypassing classification entirely. e.g. '{"y": "target_label"}'
--out, -osrc/datapilot/artifactsArtifact store directory

Usage

bash
# Default: hybrid mode, no threshold filter
dp classify --input cf78c178

# Instant rule-based only, drop low-confidence results
dp classify --input cf78c178 --mode rule_based --threshold 0.65

# Full embedding pass (all columns go through the model)
dp classify --input cf78c178 --mode embedding

# Override specific columns, let the rest be classified normally
dp classify --input cf78c178 --mode hybrid \
  --overrides '{"status": "categorical_ordinal", "y": "target_label"}'

# Use a full artifact ID instead of a run ID
dp classify --input cf78c178_ingest_output --mode hybrid

After a successful run, a classification table is printed and the artifact ID is shown:

output
[OK] Classify complete in 38.8s - 7 columns classified
  Artifact: f9374114_classify_output
  Columns:  7

┏━━━━━━━━━━━━━━┳━━━━━━━━━━━━━━━━━━━━━━┳━━━━━━━━━━━━┳━━━━━━━━━━━━┳━━━━━━━━━━━━┳━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━┓
┃ Column       ┃ Semantic Type        ┃ Confidence ┃ Source     ┃ Overridden ┃ Reasoning                                      ┃
┡━━━━━━━━━━━━━━╇━━━━━━━━━━━━━━━━━━━━━━╇━━━━━━━━━━━━╇━━━━━━━━━━━━╇━━━━━━━━━━━━╇━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━┩
│ id           │ identifier           │       0.65 │ rule_based │ no         │ column name matches identifier pattern         │
│ product_name │ categorical_nominal  │       0.70 │ embedding  │ no         │ embedding similarity 0.263 to categorical_…    │
│ category     │ categorical_nominal  │       0.70 │ embedding  │ no         │ embedding similarity 0.422 to categorical_…    │
│ price        │ numerical_continuous │       0.68 │ rule_based │ no         │ column name matches financial numeric pattern  │
│ quantity     │ numerical_discrete   │       0.65 │ rule_based │ no         │ column name matches discrete count pattern     │
│ status       │ categorical_nominal  │       0.70 │ embedding  │ no         │ embedding similarity 0.286 to categorical_…    │
│ created_at   │ datetime             │       0.70 │ rule_based │ no         │ polars dtype is Datetime(…)                    │
└──────────────┴──────────────────────┴────────────┴────────────┴────────────┴────────────────────────────────────────────────┘

Confidence colour coding

The Confidence column is colour-coded in the terminal: green ≥ 0.70 · yellow 0.55 – 0.69 · red < 0.55. Overridden columns always show confidence 1.00 and source override.

First-run model download: embedding and hybrid modes download all-MiniLM-L6-v2 (~90 MB) on first use from the Hugging Face Hub. Set the HF_TOKEN environment variable to enable higher rate limits and avoid throttling. Subsequent runs load the model from the local cache and complete significantly faster.
Use the classification result to decide which columns go into which preprocess steps. Columns classified as numerical_continuous or numerical_discrete are safe candidates for impute_mean, clip_iqr, and standard_scaler. Columns classified as identifier, datetime, or text_freeform should generally be excluded from numeric steps.

dp run

Runs an ingest followed immediately by a profile in a single command, driven by a JSON config file.

Config file format

json - tests/fixtures/run_config_pg.json
{
  "source": "postgresql",
  "connector": {
    "host": "localhost", "port": 5433,
    "database": "testdb", "user": "testuser", "password": "testpass"
  },
  "query":   { "table": "orders" },
  "ingest":  { "batch_size": 10000 },
  "profile": {
    "mode": "full", "sample_strategy": "random",
    "sample_size": 10000, "correlation": "pearson"
  }
}
bash
dp run tests/fixtures/run_config_pg.json
dp run tests/fixtures/run_config_pg.json --out /tmp/dp_artifacts

Walkthrough - PostgreSQL end-to-end

bash
# 1. Verify connection
dp connector test   --source postgresql --config tests/fixtures/pg_config.json
dp connector schema --source postgresql --config tests/fixtures/pg_config.json --table orders

# 2. Ingest
dp ingest --source postgresql --config tests/fixtures/pg_config.json --table orders
# -> Run ID: 72e0548a

# 3. Profile
dp profile --input 72e0548a --sample-strategy random --sample-size 10000

# 4. Preprocess
dp preprocess \
  --input    72e0548a \
  --pipeline tests/fixtures/preprocess_pipeline.json \
  --checkpoint
# -> Run ID: f1662de6

# 5a. Export to Parquet
dp export --input f1662de6 --format parquet --out-path output/result.parquet

# 5b. Or write back to PostgreSQL
dp export --input f1662de6 --format parquet \
  --connector tests/fixtures/pg_write_config.json \
  --write-mode upsert --primary-keys id

Walkthrough - MySQL end-to-end

bash
dp connector test   --source mysql --config tests/fixtures/mysql_config.json
dp connector schema --source mysql --config tests/fixtures/mysql_config.json --table products

dp ingest --source mysql --config tests/fixtures/mysql_config.json --table products
# -> Run ID: d7859e9e

dp profile --input d7859e9e

dp export --input d7859e9e --format parquet \
  --connector tests/fixtures/mysql_write_config.json --write-mode replace

Walkthrough - S3 / MinIO

bash
dp connector test --source s3 --config tests/fixtures/s3_config.json
dp connector upload --source s3 --config tests/fixtures/s3_config.json \
  --src-path tests/fixtures/data.csv --dest-url s3://testbucket/input/data.csv
dp connector list --source s3 --config tests/fixtures/s3_config.json --prefix input/
dp ingest --source s3 --config tests/fixtures/s3_config.json --url s3://testbucket/input/data.csv
dp export --input <run_id> --format parquet --connector tests/fixtures/s3_write_config.json
dp connector download --source s3 --config tests/fixtures/s3_config.json \
  --url s3://testbucket/output/processed.parquet --out-path output/processed.parquet

Walkthrough - Kafka

GroupCoordinatorNotAvailableError warnings are normal in single-node KRaft mode and do not affect ingestion results.
bash
docker compose -f docker-compose.test.yml exec -T kafka \
  //opt/kafka/bin/kafka-topics.sh \
  --bootstrap-server localhost:9092 \
  --create --topic test-topic --partitions 1 --replication-factor 1

dp connector produce --source kafka \
  --config tests/fixtures/kafka_config.json \
  --topic  test-topic \
  --file   tests/fixtures/data.csv

dp ingest --source kafka \
  --config      tests/fixtures/kafka_config.json \
  --topic       test-topic \
  --max-records 100

Walkthrough - REST API

On Git Bash, never use a leading / in --endpoint. Write posts instead of /posts.
bash
dp connector test   --source rest --config tests/fixtures/rest_config.json
dp connector schema --source rest --config tests/fixtures/rest_config.json
dp ingest --source rest --config tests/fixtures/rest_config.json --endpoint posts
dp ingest --source rest --config tests/fixtures/rest_pagination_config.json --endpoint products

Walkthrough - classify after ingest

Run dp classify right after ingest to understand the semantic shape of your dataset before designing a preprocess pipeline. Use the results to decide which columns to include in each step.

bash
# 1. Ingest
dp ingest --source postgresql --config tests/fixtures/pg_config.json --table orders
# -> Run ID: cf78c178

# 2. Classify - hybrid mode (recommended)
dp classify --input cf78c178 --mode hybrid
# -> Artifact: f9374114_classify_output
# price -> numerical_continuous  (safe: impute_mean, clip_iqr, standard_scaler)
# quantity -> numerical_discrete  (safe: impute_median)
# category, status -> categorical_nominal  (skip numeric steps)
# id -> identifier  (exclude from all transformations)
# created_at -> datetime  (exclude from numeric steps)

# 2b. Rule-based only - instant, no model required
dp classify --input cf78c178 --mode rule_based --threshold 0.65

# 2c. Override a column when you know better than the classifier
dp classify --input cf78c178 --mode hybrid \
  --overrides '{"status": "categorical_ordinal"}'

# 3. Preprocess - informed by classify output above
dp preprocess \
  --input    cf78c178 \
  --pipeline tests/fixtures/preprocess_pipeline.json \
  --dry-run

dp preprocess \
  --input    cf78c178 \
  --pipeline tests/fixtures/preprocess_pipeline.json
# -> Run ID: c4ccb55d

# 4. Export
dp export --input c4ccb55d --format parquet \
  --out-path output/result.parquet

Config File Reference

PostgreSQL - pg_config.json

json
{ "host": "localhost", "port": 5433, "database": "testdb", "user": "testuser", "password": "testpass" }

PostgreSQL write - pg_write_config.json

json
{ "source": "postgresql", "host": "localhost", "port": 5433,
  "database": "testdb", "user": "testuser", "password": "testpass",
  "table": "orders_processed" }

MySQL - mysql_config.json

json
{ "host": "localhost", "port": 3307, "database": "testdb", "user": "testuser", "password": "testpass" }

S3 / MinIO - s3_config.json

json
{ "endpoint_url": "http://localhost:9000", "aws_access_key_id": "minioadmin",
  "aws_secret_access_key": "minioadmin", "region": "us-east-1", "bucket": "testbucket" }

Kafka - kafka_config.json

json
{ "bootstrap_servers": "localhost:9092", "value_format": "json" }

Write Modes

All three write modes require the target table to already exist. The export pipeline does not create tables automatically.

Setup - create the target table first

bash - PostgreSQL
docker compose -f docker-compose.test.yml exec -T postgres \
  psql -U testuser -d testdb \
  -c "
CREATE TABLE IF NOT EXISTS orders_processed (
    id INTEGER PRIMARY KEY,
    product_name VARCHAR(100),
    category VARCHAR(50),
    price NUMERIC(10,6),
    quantity NUMERIC(10,6),
    status VARCHAR(20),
    created_at TIMESTAMP
);
"

Behaviour by mode

ModeBehaviourRequires
replaceTRUNCATE the target table, then INSERT all rows.Table must exist.
appendINSERT all rows without touching existing data. Fails if any row violates a unique constraint.Table must exist. No duplicate IDs in source.
upsertINSERT … ON CONFLICT DO UPDATE. Idempotent and safe to re-run.Primary key or unique constraint on --primary-keys columns.
When in doubt, use upsert with a primary key. It is idempotent and safe to re-run.

Pipeline Steps

All numeric steps enforce that the target columns are numeric - a TypeError is raised at runtime otherwise.

Step typeDescriptionColumn types
impute_meanReplace nulls with the column mean.Numeric
impute_medianReplace nulls with the column median.Numeric
impute_modeReplace nulls with the most frequent value.Any
clip_iqrClip outliers to [Q1 − 1.5×IQR, Q3 + 1.5×IQR].Numeric
standard_scalerStandardise to zero mean and unit variance.Numeric
min_max_scalerScale values to the [0, 1] range.Numeric
log_transformApply log1p transformation.Non-negative numeric
drop_duplicatesRemove exact duplicate rows.All (or subset)
drop_nullsDrop rows where specified columns are null.Any

Semantic Types

Full reference for all types returned by dp classify.

Semantic TypeDescriptionTypical columns
identifierUnique or near-unique key - not a model featureid, uuid, user_id
numerical_continuousReal-valued measurement or financial amountprice, weight, temperature
numerical_discreteWhole-number count or frequencyquantity, num_orders, visits
categorical_nominalUnordered categories with no numeric meaningcategory, status, country
categorical_ordinalOrdered categories (low → medium → high)priority, severity, grade
text_freeformUnstructured natural language textdescription, comment, review
text_structuredFormatted string following a known patternemail, phone, url, zip
datetimeDate, time, or timestampcreated_at, born, expires_at
booleanBinary flag or indicatoris_active, has_verified, deleted
geospatialGeographic coordinate or location fieldlatitude, longitude, city
embeddingDense vector representationembedding, vec
target_labelSupervised learning target or ground truthtarget, label, y
unknownCould not be classified with sufficient confidence- (use --overrides to pin manually)