CLI Reference v0.1.2

datapill CLI - dp

A command-line tool for ingesting, profiling, preprocessing, and exporting data across PostgreSQL, MySQL, S3, Kafka, REST APIs, and local files.

Python ≥ 3.11 Polars · PyArrow asyncio-native entry: dp

Installation

Install via pip. The dp entry point is registered automatically. The core install (~200 MB) includes rule-based classification, profiling, preprocessing, and export - no ML model required.

bash
# Core install (~200 MB) - rule-based classify, profile, preprocess, export
pip install datapill

# With ML extras (~3.5 GB) - enables embedding and full hybrid classify
pip install "datapill[ml]"

# Verify
dp --help
Without [ml], --mode hybrid runs entirely on rule-based logic - columns that cannot be resolved by rules are returned as unknown instead of being sent to the embedding model. Install datapill[ml] for full hybrid accuracy.

Quick Start

The minimal path from a CSV file to a profiled, preprocessed, exported dataset.

  1. Ingest a local CSV file into the artifact store.
  2. Classify columns to understand the semantic shape of the dataset.
  3. Profile the ingested dataset to understand its structure.
  4. Apply preprocessing transformations.
  5. Export the result to Parquet or write back to a database.
bash
dp ingest --source local_file --path data.csv
# -> Run ID:  abc123
# -> Artifact: abc123_ingest_output

dp classify --input abc123 --mode hybrid

dp profile --input abc123

dp preprocess --input abc123 --pipeline pipeline.json
# -> Run ID:  def456
# -> Output:  def456_preprocess_output

dp export --input def456 --format parquet --out-path output/result.parquet

Artifact System

Every pipeline command saves its outputs as artifacts - files stored in the artifact store directory (default: src/datapill/artifacts). Understanding artifacts is essential for chaining commands together.

Run ID vs Artifact ID

Each pipeline run generates a short run ID (e.g. f1662de6). The run produces one or more artifacts, each named <run_id>_<suffix>.

Example - preprocess run
Run ID f1662de6
Output f1662de6_preprocess_output ← data
Config f1662de6_preprocess_config ← step config snapshot

Passing input to commands

Any command that takes --input accepts either the run ID or a full artifact ID. When you pass a run ID, the system picks the most relevant artifact automatically based on the target command.

What you pass How it resolves
run_id only f1662de6 Selects the best artifact for the next command - e.g. profile and classify prefer ingest_output, export prefers preprocess_output
full artifact ID f1662de6_ingest_output Uses that exact artifact, no resolution needed
Use dp list to see all artifacts in the store along with their run IDs, features, and creation timestamps.

Artifact types by feature

Feature Artifacts produced Format
ingest <run_id>_ingest_output, <run_id>_ingest_schema Parquet, JSON
profile <profile_id>_detail, <profile_id>_summary JSON
preprocess <run_id>_preprocess_output, <run_id>_preprocess_config Parquet, JSON
classify <run_id>_classify_output JSON

Docker Services

All external services (PostgreSQL, MySQL, MinIO, Kafka) are defined in docker-compose.test.yml.

bash
# Start all services
docker compose -f docker-compose.test.yml up -d

# Check health status
docker compose -f docker-compose.test.yml ps
Service Image Port Credentials
postgres postgres:16-alpine 5433 testuser / testpass / testdb
mysql mysql:8-debian 3307 testuser / testpass / testdb
minio minio/minio:latest 9000 / 9001 minioadmin / minioadmin
kafka apache/kafka:3.7.0 9092 -

Seeding Test Data

On Windows with Git Bash, use docker compose exec -T with -c or -e flags instead of heredoc. Avoid /tmp paths as Git Bash maps them to Windows temp directories.

PostgreSQL

bash
docker compose -f docker-compose.test.yml exec -T postgres \
  psql -U testuser -d testdb \
  -c "
CREATE TABLE IF NOT EXISTS orders (
    id SERIAL PRIMARY KEY,
    product_name VARCHAR(100),
    category VARCHAR(50),
    price NUMERIC(10,2),
    quantity INTEGER,
    status VARCHAR(20),
    created_at TIMESTAMP DEFAULT NOW()
);
INSERT INTO orders (product_name, category, price, quantity, status)
SELECT
    'Product_' || i,
    (ARRAY['Electronics','Clothing','Food','Books'])[1 + (i % 4)],
    round((random() * 1000)::numeric, 2),
    (random() * 100)::int,
    (ARRAY['pending','shipped','delivered','cancelled'])[1 + (i % 4)]
FROM generate_series(1, 50000) i;
"

MySQL

bash
docker compose -f docker-compose.test.yml exec -T mysql \
  mysql -u testuser -ptestpass testdb \
  -e "
CREATE TABLE IF NOT EXISTS products (
    id INT AUTO_INCREMENT PRIMARY KEY,
    name VARCHAR(100), category VARCHAR(50),
    price DECIMAL(10,2), stock INT
);
INSERT INTO products (name, category, price, stock)
SELECT CONCAT('Product_', seq),
    ELT(1+(seq%4),'Electronics','Clothing','Food','Books'),
    ROUND(RAND()*1000,2), FLOOR(RAND()*100)
FROM ( SELECT a.N+b.N*10+c.N*100+d.N*1000+1 AS seq
  FROM (SELECT 0 N UNION SELECT 1 UNION SELECT 2 UNION SELECT 3 UNION SELECT 4
        UNION SELECT 5 UNION SELECT 6 UNION SELECT 7 UNION SELECT 8 UNION SELECT 9) a,
       (SELECT 0 N UNION SELECT 1 UNION SELECT 2 UNION SELECT 3 UNION SELECT 4
        UNION SELECT 5 UNION SELECT 6 UNION SELECT 7 UNION SELECT 8 UNION SELECT 9) b,
       (SELECT 0 N UNION SELECT 1 UNION SELECT 2 UNION SELECT 3 UNION SELECT 4
        UNION SELECT 5 UNION SELECT 6 UNION SELECT 7 UNION SELECT 8 UNION SELECT 9) c,
       (SELECT 0 N UNION SELECT 1 UNION SELECT 2 UNION SELECT 3 UNION SELECT 4
        UNION SELECT 5 UNION SELECT 6 UNION SELECT 7 UNION SELECT 8 UNION SELECT 9) d
  LIMIT 10000) t;
"

MinIO - Upload a file

bash
# Uses Python (boto3) to avoid Windows path issues with mc CLI
python - << 'EOF'
import boto3
s3 = boto3.client("s3", aws_access_key_id="minioadmin",
    aws_secret_access_key="minioadmin", endpoint_url="http://localhost:9000",
    region_name="us-east-1")
s3.upload_file("tests/fixtures/data.csv", "testbucket", "input/data.csv")
print("Upload OK")
EOF

Kafka - Produce test messages

bash
# Create topic first (use // prefix on Git Bash to prevent path conversion)
docker compose -f docker-compose.test.yml exec -T kafka \
  //opt/kafka/bin/kafka-topics.sh \
  --bootstrap-server localhost:9092 \
  --create --topic test-topic \
  --partitions 1 --replication-factor 1

# Produce 100 JSON messages
python - << 'EOF'
import asyncio, json
from aiokafka import AIOKafkaProducer

async def produce():
    p = AIOKafkaProducer(bootstrap_servers="localhost:9092")
    await p.start()
    for i in range(100):
        msg = json.dumps({"id": i, "product_name": f"Product_{i}",
            "category": ["Electronics","Clothing","Food","Books"][i%4],
            "price": round(i*10.5,2), "quantity": i%50}).encode()
        await p.send("test-topic", msg)
    await p.stop()
    print("Produced 100 messages")

asyncio.run(produce())
EOF

dp connector

Interact directly with a data source without going through a full pipeline. Useful for verifying connectivity, inspecting schema, and performing one-off operations like uploads, truncates, or Kafka message production before committing to a full ingest run.

Action Supported Sources Description
test all Verify connectivity and measure latency
schema all Inspect column names, types, and row estimate
upload s3, local_file Upload a local file to S3 or copy to a local destination
download s3 Download an S3 object to a local path
list s3 List objects in a bucket with optional prefix filter
exec postgresql, mysql Execute a raw SQL statement (DELETE, UPDATE, etc.)
truncate postgresql, mysql Truncate a table
produce kafka Send rows from a JSON/CSV file to a Kafka topic

test

bash
dp connector test --source postgresql --config tests/fixtures/pg_config.json
dp connector test --source mysql       --config tests/fixtures/mysql_config.json
dp connector test --source s3          --config tests/fixtures/s3_config.json
dp connector test --source kafka       --config tests/fixtures/kafka_config.json
dp connector test --source rest        --config tests/fixtures/rest_config.json

schema

bash
dp connector schema --source postgresql --config tests/fixtures/pg_config.json --table orders
dp connector schema --source mysql       --config tests/fixtures/mysql_config.json --table products
dp connector schema --source s3          --config tests/fixtures/s3_config.json --url s3://testbucket/input/data.csv
dp connector schema --source local_file  --path tests/fixtures/data.csv

upload

bash
dp connector upload \
  --source   s3 \
  --config   tests/fixtures/s3_config.json \
  --src-path tests/fixtures/data.csv \
  --dest-url s3://testbucket/upload/data.csv

download

bash
dp connector download \
  --source   s3 \
  --config   tests/fixtures/s3_config.json \
  --url      s3://testbucket/input/data.csv \
  --out-path output/downloaded.csv

list

bash
dp connector list --source s3 --config tests/fixtures/s3_config.json
dp connector list --source s3 --config tests/fixtures/s3_config.json --prefix input/

exec

bash
dp connector exec \
  --source postgresql \
  --config tests/fixtures/pg_config.json \
  --sql "DELETE FROM orders WHERE status='cancelled'"

dp connector exec \
  --source mysql \
  --config tests/fixtures/mysql_config.json \
  --sql "UPDATE products SET stock=0 WHERE price > 900"

truncate

bash
dp connector truncate --source postgresql --config tests/fixtures/pg_config.json --table orders_processed
dp connector truncate --source mysql       --config tests/fixtures/mysql_config.json --table products_processed

produce

bash
dp connector produce \
  --source     kafka \
  --config     tests/fixtures/kafka_config.json \
  --topic      test-topic \
  --file       tests/fixtures/data.csv

dp connector produce \
  --source     kafka \
  --config     tests/fixtures/kafka_config.json \
  --topic      test-topic \
  --file       tests/fixtures/data.csv \
  --key-column id

dp ingest

Reads data from a connector in streaming batches and persists it as a Parquet artifact in the artifact store. Also saves a schema snapshot (column names, types, row count) as a companion JSON artifact. The resulting artifact ID is what you pass to classify, profile, preprocess, or export.

Option Default Description
--source, -s required Connector type: local_file | postgresql | mysql | s3 | rest | kafka
--config, -c - Path to connector JSON config file
--path - Local file path (local_file)
--table - Table name (postgresql | mysql)
--url - S3 URL, e.g. s3://bucket/key.parquet
--topic - Kafka topic name
--endpoint - REST endpoint path
--out, -o src/datapill/artifacts Artifact store directory
--limit, -n - Max rows to read
--batch-size 50 000 Rows per streaming batch
--max-records - Max messages consumed (kafka)
bash
dp ingest --source local_file --path tests/fixtures/data.csv
dp ingest --source postgresql --config tests/fixtures/pg_config.json --table orders
dp ingest --source mysql --config tests/fixtures/mysql_config.json --table products
dp ingest --source s3 --config tests/fixtures/s3_config.json --url s3://testbucket/input/data.csv
dp ingest --source rest --config tests/fixtures/rest_config.json --endpoint posts
dp ingest --source kafka --config tests/fixtures/kafka_config.json --topic test-topic --max-records 100

dp profile

Computes descriptive statistics for every column and saves two artifacts: a detailed JSON (histograms, percentiles, top values) and a lighter summary JSON (null rates, distinct counts, warnings). A column-summary table is printed to the terminal at the end of the run.

Option Default Description
--input, -i required Run ID or full artifact ID
--mode, -m full full | summary
--sample-strategy none none | random | reservoir
--sample-size 100 000 Rows to sample when strategy ≠ none
--correlation pearson pearson | spearman | none
--out, -o src/datapill/artifacts Artifact store directory
bash
dp profile --input 72e0548a
dp profile --input 72e0548a --mode full --sample-strategy random --sample-size 10000 --correlation pearson
dp profile --input output/result.parquet --mode summary

dp preprocess

Applies a sequence of transformation steps defined in a JSON pipeline file. Use --dry-run to validate on the first 1 000 rows before committing. Use --checkpoint to save intermediate Parquet files after each step.

Option Default Description
--input, -i required Run ID or full artifact ID
--pipeline, -p required Path to pipeline JSON config file
--dry-run false Run on first 1 000 rows only - no artifact saved
--checkpoint false Save a Parquet file after each step
--out, -o src/datapill/artifacts Artifact store directory

Pipeline JSON format

json - tests/fixtures/preprocess_pipeline.json
{
  "steps": [
    {"type": "impute_mean",    "scope": {"columns": ["price", "quantity"]}},
    {"type": "clip_iqr",       "scope": {"columns": ["price"]}},
    {"type": "standard_scaler", "scope": {"columns": ["price", "quantity"]}}
  ]
}
bash
dp preprocess --input 72e0548a --pipeline tests/fixtures/preprocess_pipeline.json --dry-run
dp preprocess --input 72e0548a --pipeline tests/fixtures/preprocess_pipeline.json --checkpoint

dp export

Writes a dataset artifact to a local file or pushes it back to a database or S3. Supports three write modes - replace, append, and upsert.

Option Default Description
--input, -i required Run ID or full artifact ID
--format, -f required csv | parquet | json | jsonl | excel
--out-path - Destination file path
--connector, -c - Write-back connector config JSON
--write-mode replace replace | append | upsert
--primary-keys - Comma-separated key columns for upsert
--compression - snappy | zstd | gzip (parquet only)
--dry-run false Print first 10 rows, skip write
--out, -o src/datapill/artifacts Artifact store directory
bash
dp export --input f1662de6 --format parquet --out-path output/result.parquet
dp export --input f1662de6 --format csv --out-path output/result.csv
dp export --input f1662de6 --format parquet \
  --connector tests/fixtures/pg_write_config.json --write-mode upsert --primary-keys id
append + primary key constraint: Using --write-mode append on a table with a primary key will fail if duplicate IDs exist. Use upsert with --primary-keys instead.

dp list

Lists artifacts stored in the artifact store, sorted by creation time (newest first).

Option Default Description
--out, -o src/datapill/artifacts Artifact store directory
--feature, -f - Filter: ingest | profile | preprocess | classify
--limit, -n 20 Max number of artifacts to display
bash
dp list
dp list --feature ingest
dp list --feature classify
dp list --feature preprocess --limit 50

dp classify new

Inspects every column in an ingested or preprocessed dataset and assigns it a semantic type - such as identifier, numerical_continuous, or categorical_nominal - along with a confidence score and the reasoning behind the decision.

Three classification modes are available. rule_based is instant and requires no additional dependencies; it uses regex patterns against column names and Polars dtype heuristics. embedding uses a local sentence-transformers model (all-MiniLM-L6-v2) to compare each column's semantic context against typed anchor phrases. hybrid (default) runs rule-based first and only falls back to the embedding model for columns that return unknown or a confidence below 0.65 - keeping the run fast while still handling ambiguous columns accurately.

The classify output is a JSON artifact. It is not Parquet and cannot be passed to preprocess or export. Use it as a reference when deciding which columns belong in numeric pipeline steps.

Option Default Description
--input, -i required Run ID or full artifact ID from a previous ingest or preprocess
--mode, -m hybrid rule_based - regex + dtype heuristics only (instant, no model)
embedding - semantic similarity via sentence-transformers
hybrid - rule_based first, embedding for ambiguous columns
--threshold, -t 0.0 Minimum confidence to keep a classification. Columns below the threshold are recorded as unknown. Range: 0.0 – 1.0
--overrides - JSON string to manually pin columns to a semantic type, bypassing classification entirely. e.g. '{"y": "target_label"}'
--out, -o src/datapill/artifacts Artifact store directory

Usage

bash
# Default: hybrid mode, no threshold filter
dp classify --input cf78c178

# Instant rule-based only, drop low-confidence results
dp classify --input cf78c178 --mode rule_based --threshold 0.65

# Full embedding pass (all columns go through the model)
dp classify --input cf78c178 --mode embedding

# Override specific columns, let the rest be classified normally
dp classify --input cf78c178 --mode hybrid \
  --overrides '{"status": "categorical_ordinal", "y": "target_label"}'

# Use a full artifact ID instead of a run ID
dp classify --input cf78c178_ingest_output --mode hybrid

After a successful run, a classification table is printed and the artifact ID is shown:

output
[OK] Classify complete in 38.8s - 7 columns classified
  Artifact: f9374114_classify_output
  Columns:  7

┏━━━━━━━━━━━━━━┳━━━━━━━━━━━━━━━━━━━━━━┳━━━━━━━━━━━━┳━━━━━━━━━━━━┳━━━━━━━━━━━━┳━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━┓
┃ Column       ┃ Semantic Type        ┃ Confidence ┃ Source     ┃ Overridden ┃ Reasoning                                      ┃
┡━━━━━━━━━━━━━━╇━━━━━━━━━━━━━━━━━━━━━━╇━━━━━━━━━━━━╇━━━━━━━━━━━━╇━━━━━━━━━━━━╇━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━┩
│ id           │ identifier           │       0.65 │ rule_based │ no         │ column name matches identifier pattern         │
│ product_name │ categorical_nominal  │       0.70 │ embedding  │ no         │ embedding similarity 0.263 to categorical_…    │
│ category     │ categorical_nominal  │       0.70 │ embedding  │ no         │ embedding similarity 0.422 to categorical_…    │
│ price        │ numerical_continuous │       0.68 │ rule_based │ no         │ column name matches financial numeric pattern  │
│ quantity     │ numerical_discrete   │       0.65 │ rule_based │ no         │ column name matches discrete count pattern     │
│ status       │ categorical_nominal  │       0.70 │ embedding  │ no         │ embedding similarity 0.286 to categorical_…    │
│ created_at   │ datetime             │       0.70 │ rule_based │ no         │ polars dtype is Datetime(…)                    │
└──────────────┴──────────────────────┴────────────┴────────────┴────────────┴────────────────────────────────────────────────┘

Confidence colour coding

The Confidence column is colour-coded in the terminal: green ≥ 0.70 · yellow 0.55 – 0.69 · red < 0.55. Overridden columns always show confidence 1.00 and source override.

ML extras required: embedding and hybrid modes pip install "datapill[ml]" required (~3.5 GB, includes PyTorch + sentence-transformers). On first use, all-MiniLM-L6-v2 (~90 MB) is downloaded from Hugging Face Hub. Set HF_TOKEN to enable higher rate limits. Subsequent runs load from local cache.
Use the classification result to decide which columns go into which preprocess steps. Columns classified as numerical_continuous or numerical_discrete are safe candidates for impute_mean, clip_iqr, and standard_scaler. Columns classified as identifier, datetime, or text_freeform should generally be excluded from numeric steps.

dp run

Runs an ingest followed immediately by a profile in a single command, driven by a JSON config file.

Config file format

json - tests/fixtures/run_config_pg.json
{
  "source": "postgresql",
  "connector": {
    "host": "localhost", "port": 5433,
    "database": "testdb", "user": "testuser", "password": "testpass"
  },
  "query":   { "table": "orders" },
  "ingest":  { "batch_size": 10000 },
  "profile": {
    "mode": "full", "sample_strategy": "random",
    "sample_size": 10000, "correlation": "pearson"
  }
}
bash
dp run tests/fixtures/run_config_pg.json
dp run tests/fixtures/run_config_pg.json --out /tmp/dp_artifacts

dp pipeline export

Generates a standalone Python script (run_<name>.py) from a previous preprocess run. The resulting script reproduces the same preprocessing pipeline and writes the output to a file – without any dependency on the datapill package. This is ideal for deployment, scheduling, or integrating into production workflows.

You provide the --input (a preprocess run ID or its config artifact) together with an --ingest-config that describes where the original data comes from. The command reads the saved preprocessing steps and merges them into a self-contained script that uses Polars and Pandas only.

Option Default Description
--input, -i required Preprocess run ID or full artifact ID (xxx_preprocess_config)
--ingest-config, -c - Connector JSON config file (same as dp ingest --config)
--source, -s local_file Connector type: local_file | postgresql | mysql | s3 | rest | kafka
--path - Local file path (local_file)
--table - Table name (postgresql | mysql)
--url - S3 URL, e.g. s3://bucket/key.csv
--format, -f parquet Output format: csv | parquet | json | jsonl | excel
--out-path output/result.parquet Output path hard‑coded into the generated script
--name, -n - Name for generated files, e.g. orders_pipelinerun_orders_pipeline.py
--compression - snappy | zstd | gzip (parquet only)
--with-tests False Also generate test_pipeline.py with unit tests for each step
--out-dir, -o generated Directory to write the generated files
--store src/datapill/artifacts Artifact store directory

Usage

bash
# Generate from a previous preprocess run (local file source)
dp pipeline export \
  --input       def456 \
  --source      local_file \
  --path        data.csv \
  --format      parquet \
  --out-path    output/clean.parquet \
  --name        sales_pipeline

# With PostgreSQL as data source
dp pipeline export \
  --input         def456 \
  --source        postgresql \
  --ingest-config tests/fixtures/pg_config.json \
  --table         orders \
  --format        csv \
  --out-path      output/orders_clean.csv \
  --with-tests

# Use a full config artifact ID (from a preprocess run)
dp pipeline export \
  --input  def456_preprocess_config \
  --source s3 \
  --url    s3://mybucket/input/data.csv
Generated script behaviour: The script reads data using the exact source you provided, applies every preprocessing step exactly as defined in the pipeline, and saves the result. It is completely independent of datapill – only requires polars and optionally pandas (for Excel). If --with-tests is given, a test_pipeline.py is created with pytest tests that validate each step on a small sample.
When using local_file as the source, the generated script expects the input file at the exact path you provided to --path. If you intend to move the script to another machine, either hard‑code the new path in the script or pass the path as a command‑line argument (the generated script accepts --input-path).
bash
# After generation, run the script locally
python generated/run_sales_pipeline.py --dry-run       # preview first 10 rows
python generated/run_sales_pipeline.py                # full run

# With tests
python -m pytest generated/test_sales_pipeline.py -v
The generated script also contains a --input-path / --input-table / --input-url override, so you can reuse the same script for different input sources without editing the code.

Walkthrough - PostgreSQL end-to-end

bash
# 1. Verify connection
dp connector test   --source postgresql --config tests/fixtures/pg_config.json
dp connector schema --source postgresql --config tests/fixtures/pg_config.json --table orders

# 2. Ingest
dp ingest --source postgresql --config tests/fixtures/pg_config.json --table orders
# -> Run ID: 72e0548a

# 3. Profile
dp profile --input 72e0548a --sample-strategy random --sample-size 10000

# 4. Preprocess
dp preprocess \
  --input    72e0548a \
  --pipeline tests/fixtures/preprocess_pipeline.json \
  --checkpoint
# -> Run ID: f1662de6

# 5a. Export to Parquet
dp export --input f1662de6 --format parquet --out-path output/result.parquet

# 5b. Or write back to PostgreSQL
dp export --input f1662de6 --format parquet \
  --connector tests/fixtures/pg_write_config.json \
  --write-mode upsert --primary-keys id

Walkthrough - MySQL end-to-end

bash
dp connector test   --source mysql --config tests/fixtures/mysql_config.json
dp connector schema --source mysql --config tests/fixtures/mysql_config.json --table products

dp ingest --source mysql --config tests/fixtures/mysql_config.json --table products
# -> Run ID: d7859e9e

dp profile --input d7859e9e

dp export --input d7859e9e --format parquet \
  --connector tests/fixtures/mysql_write_config.json --write-mode replace

Walkthrough - S3 / MinIO

bash
dp connector test --source s3 --config tests/fixtures/s3_config.json
dp connector upload --source s3 --config tests/fixtures/s3_config.json \
  --src-path tests/fixtures/data.csv --dest-url s3://testbucket/input/data.csv
dp connector list --source s3 --config tests/fixtures/s3_config.json --prefix input/
dp ingest --source s3 --config tests/fixtures/s3_config.json --url s3://testbucket/input/data.csv
dp export --input <run_id> --format parquet --connector tests/fixtures/s3_write_config.json
dp connector download --source s3 --config tests/fixtures/s3_config.json \
  --url s3://testbucket/output/processed.parquet --out-path output/processed.parquet

Walkthrough - Kafka

GroupCoordinatorNotAvailableError warnings are normal in single-node KRaft mode and do not affect ingestion results.
bash
docker compose -f docker-compose.test.yml exec -T kafka \
  //opt/kafka/bin/kafka-topics.sh \
  --bootstrap-server localhost:9092 \
  --create --topic test-topic --partitions 1 --replication-factor 1

dp connector produce --source kafka \
  --config tests/fixtures/kafka_config.json \
  --topic  test-topic \
  --file   tests/fixtures/data.csv

dp ingest --source kafka \
  --config      tests/fixtures/kafka_config.json \
  --topic       test-topic \
  --max-records 100

Walkthrough - REST API

On Git Bash, never use a leading / in --endpoint. Write posts instead of /posts.
bash
dp connector test   --source rest --config tests/fixtures/rest_config.json
dp connector schema --source rest --config tests/fixtures/rest_config.json
dp ingest --source rest --config tests/fixtures/rest_config.json --endpoint posts
dp ingest --source rest --config tests/fixtures/rest_pagination_config.json --endpoint products

Walkthrough - classify after ingest

Run dp classify right after ingest to understand the semantic shape of your dataset before designing a preprocess pipeline. Use the results to decide which columns to include in each step.

bash
# 1. Ingest
dp ingest --source postgresql --config tests/fixtures/pg_config.json --table orders
# -> Run ID: cf78c178

# 2. Classify - hybrid mode (recommended)
dp classify --input cf78c178 --mode hybrid
# -> Artifact: f9374114_classify_output
# price -> numerical_continuous  (safe: impute_mean, clip_iqr, standard_scaler)
# quantity -> numerical_discrete  (safe: impute_median)
# category, status -> categorical_nominal  (skip numeric steps)
# id -> identifier  (exclude from all transformations)
# created_at -> datetime  (exclude from numeric steps)

# 2b. Rule-based only - instant, no model required
dp classify --input cf78c178 --mode rule_based --threshold 0.65

# 2c. Override a column when you know better than the classifier
dp classify --input cf78c178 --mode hybrid \
  --overrides '{"status": "categorical_ordinal"}'

# 3. Preprocess - informed by classify output above
dp preprocess \
  --input    cf78c178 \
  --pipeline tests/fixtures/preprocess_pipeline.json \
  --dry-run

dp preprocess \
  --input    cf78c178 \
  --pipeline tests/fixtures/preprocess_pipeline.json
# -> Run ID: c4ccb55d

# 4. Export
dp export --input c4ccb55d --format parquet \
  --out-path output/result.parquet

Config File Reference

PostgreSQL - pg_config.json

json
{ "host": "localhost", "port": 5433, "database": "testdb", "user": "testuser", "password": "testpass" }

PostgreSQL write - pg_write_config.json

json
{ "source": "postgresql", "host": "localhost", "port": 5433,
  "database": "testdb", "user": "testuser", "password": "testpass",
  "table": "orders_processed" }

MySQL - mysql_config.json

json
{ "host": "localhost", "port": 3307, "database": "testdb", "user": "testuser", "password": "testpass" }

S3 / MinIO - s3_config.json

json
{ "endpoint_url": "http://localhost:9000", "aws_access_key_id": "minioadmin",
  "aws_secret_access_key": "minioadmin", "region": "us-east-1", "bucket": "testbucket" }

Kafka - kafka_config.json

json
{ "bootstrap_servers": "localhost:9092", "value_format": "json" }

Write Modes

All three write modes require the target table to already exist. The export pipeline does not create tables automatically.

Setup - create the target table first

bash - PostgreSQL
docker compose -f docker-compose.test.yml exec -T postgres \
  psql -U testuser -d testdb \
  -c "
CREATE TABLE IF NOT EXISTS orders_processed (
    id INTEGER PRIMARY KEY,
    product_name VARCHAR(100),
    category VARCHAR(50),
    price NUMERIC(10,6),
    quantity NUMERIC(10,6),
    status VARCHAR(20),
    created_at TIMESTAMP
);
"

Behaviour by mode

Mode Behaviour Requires
replace TRUNCATE the target table, then INSERT all rows. Table must exist.
append INSERT all rows without touching existing data. Fails if any row violates a unique constraint. Table must exist. No duplicate IDs in source.
upsert INSERT … ON CONFLICT DO UPDATE. Idempotent and safe to re-run. Primary key or unique constraint on --primary-keys columns.
When in doubt, use upsert with a primary key. It is idempotent and safe to re-run.

Pipeline Steps

Steps are defined in a JSON pipeline file. Each step has a type, an optional scope.columns list, and an optional params object. If columns is omitted or empty, the step applies to all eligible columns. Numeric steps enforce that target columns are numeric - a TypeError is raised at runtime otherwise.

Missing Values

Step type Description Column types params
impute_mean Replace nulls with the column mean. Numeric -
impute_median Replace nulls with the column median. Numeric -
impute_mode Replace nulls with the most frequent value. Any -
drop_missing Drop rows where any of the specified columns are null. Any -

Outliers

Step type Description Column types params
clip_iqr Clip values to [Q1 − 1.5×IQR, Q3 + 1.5×IQR]. Numeric -
clip_zscore Clip values beyond N standard deviations from the mean. Numeric threshold (float, default 3.0)
json - clip_zscore with custom threshold
{ "type": "clip_zscore", "scope": { "columns": ["price"] }, "params": { "threshold": 2.5 } }

Scaling

Step type Description Column types params
standard_scaler Standardise to zero mean and unit variance. Skips columns with std = 0. Numeric -
minmax_scaler Scale to [0, 1]. Skips columns where max = min. Numeric -
robust_scaler Scale using median and IQR - robust to outliers. Skips columns with IQR = 0. Numeric -

Encoding

Step type Description Column types params
onehot One-hot encode each category as a new Int8 column named col__value. Drops the original column. String / Categorical -
ordinal Map categories to integers. Auto-sorts alphabetically if no order is given. String / Categorical order (dict: column -> ordered list of values)
json - ordinal with explicit order
{
  "type": "ordinal",
  "scope": { "columns": ["priority", "size"] },
  "params": {
    "order": {
      "priority": ["low", "medium", "high"],
      "size":     ["S", "M", "L", "XL"]
    }
  }
}

Structure

Step type Description params
select_columns Keep only the specified columns, drop all others. - (uses scope.columns)
drop_columns Drop the specified columns. - (uses scope.columns)
rename_columns Rename columns using a mapping. mapping (dict: old_name -> new_name)
cast_dtype Cast columns to target dtypes. casts (dict: column -> dtype string)
deduplicate Remove exact duplicate rows. If columns is set, deduplicates on that subset only. - (uses scope.columns as subset)
json - rename_columns
{ "type": "rename_columns", "scope": { "columns": [] }, "params": { "mapping": { "old_name": "new_name", "qty": "quantity" } } }
json - cast_dtype
{
  "type": "cast_dtype",
  "scope": { "columns": [] },
  "params": {
    "casts": {
      "price":      "float64",
      "quantity":   "int32",
      "is_active":  "bool",
      "created_at": "datetime"
    }
  }
}

Supported dtype strings: int8 · int16 · int32 · int64 · float32 · float64 · str · bool · date · datetime

Custom Python

Step type Description params
custom_python Run a sandboxed Python function (via RestrictedPython). The function receives a pl.DataFrame and must return a pl.DataFrame. code (string, Python source) · func (string, function name, default transform)
json - custom_python
{
  "type": "custom_python",
  "scope": { "columns": [] },
  "params": {
    "func": "transform",
    "code": "def transform(df):
    return df.with_columns((pl.col('revenue') / pl.col('units')).alias('avg_price'))"
  }
}
Custom code is validated by an AST analyzer before execution. Banned: arbitrary imports, dunder access, dangerous builtins. pl (Polars) is injected automatically. For untrusted code, use the Docker sandbox backend instead.

Step conflict warnings

The pipeline automatically detects and warns about common mistakes before execution:

Pattern Warning
impute_* followed by drop_missing on the same column drop_missing is redundant - column was already imputed
clip_iqr followed by clip_zscore on the same column overlapping outlier clipping steps detected
standard_scaler / minmax_scaler / robust_scaler without a prior impute on the same column scaler may produce NaN - add impute before scaling

Semantic Types

Full reference for all types returned by dp classify.

Semantic Type Description Typical columns
identifier Unique or near-unique key - not a model feature id, uuid, user_id
numerical_continuous Real-valued measurement or financial amount price, weight, temperature
numerical_discrete Whole-number count or frequency quantity, num_orders, visits
categorical_nominal Unordered categories with no numeric meaning category, status, country
categorical_ordinal Ordered categories (low -> medium -> high) priority, severity, grade
text_freeform Unstructured natural language text description, comment, review
text_structured Formatted string following a known pattern email, phone, url, zip
datetime Date, time, or timestamp created_at, born, expires_at
boolean Binary flag or indicator is_active, has_verified, deleted
geospatial Geographic coordinate or location field latitude, longitude, city
embedding Dense vector representation embedding, vec
target_label Supervised learning target or ground truth target, label, y
unknown Could not be classified with sufficient confidence - (use --overrides to pin manually)