datapill CLI - dp
A command-line tool for ingesting, profiling, preprocessing, and exporting data across PostgreSQL, MySQL, S3, Kafka, REST APIs, and local files.
Installation
Install via pip. The dp entry point is registered automatically. The core install (~200 MB) includes rule-based classification, profiling, preprocessing, and export - no ML model required.
# Core install (~200 MB) - rule-based classify, profile, preprocess, export pip install datapill # With ML extras (~3.5 GB) - enables embedding and full hybrid classify pip install "datapill[ml]" # Verify dp --help
[ml], --mode hybrid runs entirely on rule-based logic - columns that cannot be resolved by rules are returned as unknown instead of being sent to the embedding model. Install datapill[ml] for full hybrid accuracy.
Quick Start
The minimal path from a CSV file to a profiled, preprocessed, exported dataset.
- Ingest a local CSV file into the artifact store.
- Classify columns to understand the semantic shape of the dataset.
- Profile the ingested dataset to understand its structure.
- Apply preprocessing transformations.
- Export the result to Parquet or write back to a database.
dp ingest --source local_file --path data.csv # -> Run ID: abc123 # -> Artifact: abc123_ingest_output dp classify --input abc123 --mode hybrid dp profile --input abc123 dp preprocess --input abc123 --pipeline pipeline.json # -> Run ID: def456 # -> Output: def456_preprocess_output dp export --input def456 --format parquet --out-path output/result.parquet
Artifact System
Every pipeline command saves its outputs as artifacts - files stored in the artifact store directory (default: src/datapill/artifacts). Understanding artifacts is essential for chaining commands together.
Run ID vs Artifact ID
Each pipeline run generates a short run ID (e.g. f1662de6). The run produces one or more artifacts, each named <run_id>_<suffix>.
Passing input to commands
Any command that takes --input accepts either the run ID or a full artifact ID. When you pass a run ID, the system picks the most relevant artifact automatically based on the target command.
| What you pass | How it resolves |
|---|---|
run_id only f1662de6 | Selects the best artifact for the next command - e.g. profile and classify prefer ingest_output, export prefers preprocess_output |
full artifact ID f1662de6_ingest_output | Uses that exact artifact, no resolution needed |
dp list to see all artifacts in the store along with their run IDs, features, and creation timestamps.
Artifact types by feature
| Feature | Artifacts produced | Format |
|---|---|---|
| ingest | <run_id>_ingest_output, <run_id>_ingest_schema | Parquet, JSON |
| profile | <profile_id>_detail, <profile_id>_summary | JSON |
| preprocess | <run_id>_preprocess_output, <run_id>_preprocess_config | Parquet, JSON |
| classify | <run_id>_classify_output | JSON |
Docker Services
All external services (PostgreSQL, MySQL, MinIO, Kafka) are defined in docker-compose.test.yml.
# Start all services docker compose -f docker-compose.test.yml up -d # Check health status docker compose -f docker-compose.test.yml ps
| Service | Image | Port | Credentials |
|---|---|---|---|
| postgres | postgres:16-alpine | 5433 | testuser / testpass / testdb |
| mysql | mysql:8-debian | 3307 | testuser / testpass / testdb |
| minio | minio/minio:latest | 9000 / 9001 | minioadmin / minioadmin |
| kafka | apache/kafka:3.7.0 | 9092 | - |
Seeding Test Data
docker compose exec -T with -c or -e flags instead of heredoc. Avoid /tmp paths as Git Bash maps them to Windows temp directories.PostgreSQL
docker compose -f docker-compose.test.yml exec -T postgres \ psql -U testuser -d testdb \ -c " CREATE TABLE IF NOT EXISTS orders ( id SERIAL PRIMARY KEY, product_name VARCHAR(100), category VARCHAR(50), price NUMERIC(10,2), quantity INTEGER, status VARCHAR(20), created_at TIMESTAMP DEFAULT NOW() ); INSERT INTO orders (product_name, category, price, quantity, status) SELECT 'Product_' || i, (ARRAY['Electronics','Clothing','Food','Books'])[1 + (i % 4)], round((random() * 1000)::numeric, 2), (random() * 100)::int, (ARRAY['pending','shipped','delivered','cancelled'])[1 + (i % 4)] FROM generate_series(1, 50000) i; "
MySQL
docker compose -f docker-compose.test.yml exec -T mysql \ mysql -u testuser -ptestpass testdb \ -e " CREATE TABLE IF NOT EXISTS products ( id INT AUTO_INCREMENT PRIMARY KEY, name VARCHAR(100), category VARCHAR(50), price DECIMAL(10,2), stock INT ); INSERT INTO products (name, category, price, stock) SELECT CONCAT('Product_', seq), ELT(1+(seq%4),'Electronics','Clothing','Food','Books'), ROUND(RAND()*1000,2), FLOOR(RAND()*100) FROM ( SELECT a.N+b.N*10+c.N*100+d.N*1000+1 AS seq FROM (SELECT 0 N UNION SELECT 1 UNION SELECT 2 UNION SELECT 3 UNION SELECT 4 UNION SELECT 5 UNION SELECT 6 UNION SELECT 7 UNION SELECT 8 UNION SELECT 9) a, (SELECT 0 N UNION SELECT 1 UNION SELECT 2 UNION SELECT 3 UNION SELECT 4 UNION SELECT 5 UNION SELECT 6 UNION SELECT 7 UNION SELECT 8 UNION SELECT 9) b, (SELECT 0 N UNION SELECT 1 UNION SELECT 2 UNION SELECT 3 UNION SELECT 4 UNION SELECT 5 UNION SELECT 6 UNION SELECT 7 UNION SELECT 8 UNION SELECT 9) c, (SELECT 0 N UNION SELECT 1 UNION SELECT 2 UNION SELECT 3 UNION SELECT 4 UNION SELECT 5 UNION SELECT 6 UNION SELECT 7 UNION SELECT 8 UNION SELECT 9) d LIMIT 10000) t; "
MinIO - Upload a file
# Uses Python (boto3) to avoid Windows path issues with mc CLI python - << 'EOF' import boto3 s3 = boto3.client("s3", aws_access_key_id="minioadmin", aws_secret_access_key="minioadmin", endpoint_url="http://localhost:9000", region_name="us-east-1") s3.upload_file("tests/fixtures/data.csv", "testbucket", "input/data.csv") print("Upload OK") EOF
Kafka - Produce test messages
# Create topic first (use // prefix on Git Bash to prevent path conversion) docker compose -f docker-compose.test.yml exec -T kafka \ //opt/kafka/bin/kafka-topics.sh \ --bootstrap-server localhost:9092 \ --create --topic test-topic \ --partitions 1 --replication-factor 1 # Produce 100 JSON messages python - << 'EOF' import asyncio, json from aiokafka import AIOKafkaProducer async def produce(): p = AIOKafkaProducer(bootstrap_servers="localhost:9092") await p.start() for i in range(100): msg = json.dumps({"id": i, "product_name": f"Product_{i}", "category": ["Electronics","Clothing","Food","Books"][i%4], "price": round(i*10.5,2), "quantity": i%50}).encode() await p.send("test-topic", msg) await p.stop() print("Produced 100 messages") asyncio.run(produce()) EOF
dp connector
Interact directly with a data source without going through a full pipeline. Useful for verifying connectivity, inspecting schema, and performing one-off operations like uploads, truncates, or Kafka message production before committing to a full ingest run.
| Action | Supported Sources | Description |
|---|---|---|
| test | all | Verify connectivity and measure latency |
| schema | all | Inspect column names, types, and row estimate |
| upload | s3, local_file | Upload a local file to S3 or copy to a local destination |
| download | s3 | Download an S3 object to a local path |
| list | s3 | List objects in a bucket with optional prefix filter |
| exec | postgresql, mysql | Execute a raw SQL statement (DELETE, UPDATE, etc.) |
| truncate | postgresql, mysql | Truncate a table |
| produce | kafka | Send rows from a JSON/CSV file to a Kafka topic |
test
dp connector test --source postgresql --config tests/fixtures/pg_config.json dp connector test --source mysql --config tests/fixtures/mysql_config.json dp connector test --source s3 --config tests/fixtures/s3_config.json dp connector test --source kafka --config tests/fixtures/kafka_config.json dp connector test --source rest --config tests/fixtures/rest_config.json
schema
dp connector schema --source postgresql --config tests/fixtures/pg_config.json --table orders dp connector schema --source mysql --config tests/fixtures/mysql_config.json --table products dp connector schema --source s3 --config tests/fixtures/s3_config.json --url s3://testbucket/input/data.csv dp connector schema --source local_file --path tests/fixtures/data.csv
upload
dp connector upload \ --source s3 \ --config tests/fixtures/s3_config.json \ --src-path tests/fixtures/data.csv \ --dest-url s3://testbucket/upload/data.csv
download
dp connector download \ --source s3 \ --config tests/fixtures/s3_config.json \ --url s3://testbucket/input/data.csv \ --out-path output/downloaded.csv
list
dp connector list --source s3 --config tests/fixtures/s3_config.json dp connector list --source s3 --config tests/fixtures/s3_config.json --prefix input/
exec
dp connector exec \ --source postgresql \ --config tests/fixtures/pg_config.json \ --sql "DELETE FROM orders WHERE status='cancelled'" dp connector exec \ --source mysql \ --config tests/fixtures/mysql_config.json \ --sql "UPDATE products SET stock=0 WHERE price > 900"
truncate
dp connector truncate --source postgresql --config tests/fixtures/pg_config.json --table orders_processed dp connector truncate --source mysql --config tests/fixtures/mysql_config.json --table products_processed
produce
dp connector produce \ --source kafka \ --config tests/fixtures/kafka_config.json \ --topic test-topic \ --file tests/fixtures/data.csv dp connector produce \ --source kafka \ --config tests/fixtures/kafka_config.json \ --topic test-topic \ --file tests/fixtures/data.csv \ --key-column id
dp ingest
Reads data from a connector in streaming batches and persists it as a Parquet artifact in the artifact store. Also saves a schema snapshot (column names, types, row count) as a companion JSON artifact. The resulting artifact ID is what you pass to classify, profile, preprocess, or export.
| Option | Default | Description |
|---|---|---|
| --source, -s | required | Connector type: local_file | postgresql | mysql | s3 | rest | kafka |
| --config, -c | - | Path to connector JSON config file |
| --path | - | Local file path (local_file) |
| --table | - | Table name (postgresql | mysql) |
| --url | - | S3 URL, e.g. s3://bucket/key.parquet |
| --topic | - | Kafka topic name |
| --endpoint | - | REST endpoint path |
| --out, -o | src/datapill/artifacts | Artifact store directory |
| --limit, -n | - | Max rows to read |
| --batch-size | 50 000 | Rows per streaming batch |
| --max-records | - | Max messages consumed (kafka) |
dp ingest --source local_file --path tests/fixtures/data.csv dp ingest --source postgresql --config tests/fixtures/pg_config.json --table orders dp ingest --source mysql --config tests/fixtures/mysql_config.json --table products dp ingest --source s3 --config tests/fixtures/s3_config.json --url s3://testbucket/input/data.csv dp ingest --source rest --config tests/fixtures/rest_config.json --endpoint posts dp ingest --source kafka --config tests/fixtures/kafka_config.json --topic test-topic --max-records 100
dp profile
Computes descriptive statistics for every column and saves two artifacts: a detailed JSON (histograms, percentiles, top values) and a lighter summary JSON (null rates, distinct counts, warnings). A column-summary table is printed to the terminal at the end of the run.
| Option | Default | Description |
|---|---|---|
| --input, -i | required | Run ID or full artifact ID |
| --mode, -m | full | full | summary |
| --sample-strategy | none | none | random | reservoir |
| --sample-size | 100 000 | Rows to sample when strategy ≠ none |
| --correlation | pearson | pearson | spearman | none |
| --out, -o | src/datapill/artifacts | Artifact store directory |
dp profile --input 72e0548a dp profile --input 72e0548a --mode full --sample-strategy random --sample-size 10000 --correlation pearson dp profile --input output/result.parquet --mode summary
dp preprocess
Applies a sequence of transformation steps defined in a JSON pipeline file. Use --dry-run to validate on the first 1 000 rows before committing. Use --checkpoint to save intermediate Parquet files after each step.
| Option | Default | Description |
|---|---|---|
| --input, -i | required | Run ID or full artifact ID |
| --pipeline, -p | required | Path to pipeline JSON config file |
| --dry-run | false | Run on first 1 000 rows only - no artifact saved |
| --checkpoint | false | Save a Parquet file after each step |
| --out, -o | src/datapill/artifacts | Artifact store directory |
Pipeline JSON format
{
"steps": [
{"type": "impute_mean", "scope": {"columns": ["price", "quantity"]}},
{"type": "clip_iqr", "scope": {"columns": ["price"]}},
{"type": "standard_scaler", "scope": {"columns": ["price", "quantity"]}}
]
}
dp preprocess --input 72e0548a --pipeline tests/fixtures/preprocess_pipeline.json --dry-run dp preprocess --input 72e0548a --pipeline tests/fixtures/preprocess_pipeline.json --checkpoint
dp export
Writes a dataset artifact to a local file or pushes it back to a database or S3. Supports three write modes - replace, append, and upsert.
| Option | Default | Description |
|---|---|---|
| --input, -i | required | Run ID or full artifact ID |
| --format, -f | required | csv | parquet | json | jsonl | excel |
| --out-path | - | Destination file path |
| --connector, -c | - | Write-back connector config JSON |
| --write-mode | replace | replace | append | upsert |
| --primary-keys | - | Comma-separated key columns for upsert |
| --compression | - | snappy | zstd | gzip (parquet only) |
| --dry-run | false | Print first 10 rows, skip write |
| --out, -o | src/datapill/artifacts | Artifact store directory |
dp export --input f1662de6 --format parquet --out-path output/result.parquet dp export --input f1662de6 --format csv --out-path output/result.csv dp export --input f1662de6 --format parquet \ --connector tests/fixtures/pg_write_config.json --write-mode upsert --primary-keys id
--write-mode append on a table with a primary key will fail if duplicate IDs exist. Use upsert with --primary-keys instead.
dp list
Lists artifacts stored in the artifact store, sorted by creation time (newest first).
| Option | Default | Description |
|---|---|---|
| --out, -o | src/datapill/artifacts | Artifact store directory |
| --feature, -f | - | Filter: ingest | profile | preprocess | classify |
| --limit, -n | 20 | Max number of artifacts to display |
dp list dp list --feature ingest dp list --feature classify dp list --feature preprocess --limit 50
dp classify new
Inspects every column in an ingested or preprocessed dataset and assigns it a
semantic type - such as identifier,
numerical_continuous, or categorical_nominal - along with
a confidence score and the reasoning behind the decision.
Three classification modes are available. rule_based is instant and
requires no additional dependencies; it uses regex patterns against column names and
Polars dtype heuristics. embedding uses a local
sentence-transformers model (all-MiniLM-L6-v2) to compare
each column's semantic context against typed anchor phrases. hybrid
(default) runs rule-based first and only falls back to the embedding model for columns
that return unknown or a confidence below 0.65 -
keeping the run fast while still handling ambiguous columns accurately.
The classify output is a JSON artifact. It is not Parquet and cannot be
passed to preprocess or export. Use it as a reference when
deciding which columns belong in numeric pipeline steps.
| Option | Default | Description |
|---|---|---|
| --input, -i | required | Run ID or full artifact ID from a previous ingest or preprocess |
| --mode, -m | hybrid |
rule_based - regex + dtype heuristics only (instant, no model)embedding - semantic similarity via sentence-transformershybrid - rule_based first, embedding for ambiguous columns
|
| --threshold, -t | 0.0 | Minimum confidence to keep a classification. Columns below the threshold are recorded as unknown. Range: 0.0 – 1.0 |
| --overrides | - | JSON string to manually pin columns to a semantic type, bypassing classification entirely. e.g. '{"y": "target_label"}' |
| --out, -o | src/datapill/artifacts | Artifact store directory |
Usage
# Default: hybrid mode, no threshold filter dp classify --input cf78c178 # Instant rule-based only, drop low-confidence results dp classify --input cf78c178 --mode rule_based --threshold 0.65 # Full embedding pass (all columns go through the model) dp classify --input cf78c178 --mode embedding # Override specific columns, let the rest be classified normally dp classify --input cf78c178 --mode hybrid \ --overrides '{"status": "categorical_ordinal", "y": "target_label"}' # Use a full artifact ID instead of a run ID dp classify --input cf78c178_ingest_output --mode hybrid
After a successful run, a classification table is printed and the artifact ID is shown:
[OK] Classify complete in 38.8s - 7 columns classified Artifact: f9374114_classify_output Columns: 7 ┏━━━━━━━━━━━━━━┳━━━━━━━━━━━━━━━━━━━━━━┳━━━━━━━━━━━━┳━━━━━━━━━━━━┳━━━━━━━━━━━━┳━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━┓ ┃ Column ┃ Semantic Type ┃ Confidence ┃ Source ┃ Overridden ┃ Reasoning ┃ ┡━━━━━━━━━━━━━━╇━━━━━━━━━━━━━━━━━━━━━━╇━━━━━━━━━━━━╇━━━━━━━━━━━━╇━━━━━━━━━━━━╇━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━┩ │ id │ identifier │ 0.65 │ rule_based │ no │ column name matches identifier pattern │ │ product_name │ categorical_nominal │ 0.70 │ embedding │ no │ embedding similarity 0.263 to categorical_… │ │ category │ categorical_nominal │ 0.70 │ embedding │ no │ embedding similarity 0.422 to categorical_… │ │ price │ numerical_continuous │ 0.68 │ rule_based │ no │ column name matches financial numeric pattern │ │ quantity │ numerical_discrete │ 0.65 │ rule_based │ no │ column name matches discrete count pattern │ │ status │ categorical_nominal │ 0.70 │ embedding │ no │ embedding similarity 0.286 to categorical_… │ │ created_at │ datetime │ 0.70 │ rule_based │ no │ polars dtype is Datetime(…) │ └──────────────┴──────────────────────┴────────────┴────────────┴────────────┴────────────────────────────────────────────────┘
Confidence colour coding
The Confidence column is colour-coded in the terminal:
green ≥ 0.70 ·
yellow 0.55 – 0.69 ·
red < 0.55.
Overridden columns always show confidence 1.00 and source override.
embedding and hybrid modes
pip install "datapill[ml]" required (~3.5 GB, includes PyTorch + sentence-transformers).
On first use, all-MiniLM-L6-v2 (~90 MB) is downloaded from Hugging Face Hub.
Set HF_TOKEN to enable higher rate limits. Subsequent runs load from local cache.
numerical_continuous or numerical_discrete
are safe candidates for impute_mean, clip_iqr, and
standard_scaler. Columns classified as identifier,
datetime, or text_freeform should generally be excluded from
numeric steps.
dp run
Runs an ingest followed immediately by a profile in a single command, driven by a JSON config file.
Config file format
{
"source": "postgresql",
"connector": {
"host": "localhost", "port": 5433,
"database": "testdb", "user": "testuser", "password": "testpass"
},
"query": { "table": "orders" },
"ingest": { "batch_size": 10000 },
"profile": {
"mode": "full", "sample_strategy": "random",
"sample_size": 10000, "correlation": "pearson"
}
}
dp run tests/fixtures/run_config_pg.json dp run tests/fixtures/run_config_pg.json --out /tmp/dp_artifacts
Walkthrough - PostgreSQL end-to-end
# 1. Verify connection dp connector test --source postgresql --config tests/fixtures/pg_config.json dp connector schema --source postgresql --config tests/fixtures/pg_config.json --table orders # 2. Ingest dp ingest --source postgresql --config tests/fixtures/pg_config.json --table orders # -> Run ID: 72e0548a # 3. Profile dp profile --input 72e0548a --sample-strategy random --sample-size 10000 # 4. Preprocess dp preprocess \ --input 72e0548a \ --pipeline tests/fixtures/preprocess_pipeline.json \ --checkpoint # -> Run ID: f1662de6 # 5a. Export to Parquet dp export --input f1662de6 --format parquet --out-path output/result.parquet # 5b. Or write back to PostgreSQL dp export --input f1662de6 --format parquet \ --connector tests/fixtures/pg_write_config.json \ --write-mode upsert --primary-keys id
Walkthrough - MySQL end-to-end
dp connector test --source mysql --config tests/fixtures/mysql_config.json dp connector schema --source mysql --config tests/fixtures/mysql_config.json --table products dp ingest --source mysql --config tests/fixtures/mysql_config.json --table products # -> Run ID: d7859e9e dp profile --input d7859e9e dp export --input d7859e9e --format parquet \ --connector tests/fixtures/mysql_write_config.json --write-mode replace
Walkthrough - S3 / MinIO
dp connector test --source s3 --config tests/fixtures/s3_config.json dp connector upload --source s3 --config tests/fixtures/s3_config.json \ --src-path tests/fixtures/data.csv --dest-url s3://testbucket/input/data.csv dp connector list --source s3 --config tests/fixtures/s3_config.json --prefix input/ dp ingest --source s3 --config tests/fixtures/s3_config.json --url s3://testbucket/input/data.csv dp export --input <run_id> --format parquet --connector tests/fixtures/s3_write_config.json dp connector download --source s3 --config tests/fixtures/s3_config.json \ --url s3://testbucket/output/processed.parquet --out-path output/processed.parquet
Walkthrough - Kafka
GroupCoordinatorNotAvailableError warnings are normal in single-node KRaft mode and do not affect ingestion results.docker compose -f docker-compose.test.yml exec -T kafka \ //opt/kafka/bin/kafka-topics.sh \ --bootstrap-server localhost:9092 \ --create --topic test-topic --partitions 1 --replication-factor 1 dp connector produce --source kafka \ --config tests/fixtures/kafka_config.json \ --topic test-topic \ --file tests/fixtures/data.csv dp ingest --source kafka \ --config tests/fixtures/kafka_config.json \ --topic test-topic \ --max-records 100
Walkthrough - REST API
/ in --endpoint. Write posts instead of /posts.dp connector test --source rest --config tests/fixtures/rest_config.json dp connector schema --source rest --config tests/fixtures/rest_config.json dp ingest --source rest --config tests/fixtures/rest_config.json --endpoint posts dp ingest --source rest --config tests/fixtures/rest_pagination_config.json --endpoint products
Walkthrough - classify after ingest
Run dp classify right after ingest to understand the semantic shape of
your dataset before designing a preprocess pipeline. Use the results to decide which
columns to include in each step.
# 1. Ingest dp ingest --source postgresql --config tests/fixtures/pg_config.json --table orders # -> Run ID: cf78c178 # 2. Classify - hybrid mode (recommended) dp classify --input cf78c178 --mode hybrid # -> Artifact: f9374114_classify_output # price -> numerical_continuous (safe: impute_mean, clip_iqr, standard_scaler) # quantity -> numerical_discrete (safe: impute_median) # category, status -> categorical_nominal (skip numeric steps) # id -> identifier (exclude from all transformations) # created_at -> datetime (exclude from numeric steps) # 2b. Rule-based only - instant, no model required dp classify --input cf78c178 --mode rule_based --threshold 0.65 # 2c. Override a column when you know better than the classifier dp classify --input cf78c178 --mode hybrid \ --overrides '{"status": "categorical_ordinal"}' # 3. Preprocess - informed by classify output above dp preprocess \ --input cf78c178 \ --pipeline tests/fixtures/preprocess_pipeline.json \ --dry-run dp preprocess \ --input cf78c178 \ --pipeline tests/fixtures/preprocess_pipeline.json # -> Run ID: c4ccb55d # 4. Export dp export --input c4ccb55d --format parquet \ --out-path output/result.parquet
Config File Reference
PostgreSQL - pg_config.json
{ "host": "localhost", "port": 5433, "database": "testdb", "user": "testuser", "password": "testpass" }
PostgreSQL write - pg_write_config.json
{ "source": "postgresql", "host": "localhost", "port": 5433,
"database": "testdb", "user": "testuser", "password": "testpass",
"table": "orders_processed" }
MySQL - mysql_config.json
{ "host": "localhost", "port": 3307, "database": "testdb", "user": "testuser", "password": "testpass" }
S3 / MinIO - s3_config.json
{ "endpoint_url": "http://localhost:9000", "aws_access_key_id": "minioadmin",
"aws_secret_access_key": "minioadmin", "region": "us-east-1", "bucket": "testbucket" }
Kafka - kafka_config.json
{ "bootstrap_servers": "localhost:9092", "value_format": "json" }
Write Modes
All three write modes require the target table to already exist. The export pipeline does not create tables automatically.
Setup - create the target table first
docker compose -f docker-compose.test.yml exec -T postgres \ psql -U testuser -d testdb \ -c " CREATE TABLE IF NOT EXISTS orders_processed ( id INTEGER PRIMARY KEY, product_name VARCHAR(100), category VARCHAR(50), price NUMERIC(10,6), quantity NUMERIC(10,6), status VARCHAR(20), created_at TIMESTAMP ); "
Behaviour by mode
| Mode | Behaviour | Requires |
|---|---|---|
| replace | TRUNCATE the target table, then INSERT all rows. | Table must exist. |
| append | INSERT all rows without touching existing data. Fails if any row violates a unique constraint. | Table must exist. No duplicate IDs in source. |
| upsert | INSERT … ON CONFLICT DO UPDATE. Idempotent and safe to re-run. | Primary key or unique constraint on --primary-keys columns. |
Pipeline Steps
Steps are defined in a JSON pipeline file. Each step has a type, an optional scope.columns list, and an optional params object. If columns is omitted or empty, the step applies to all eligible columns. Numeric steps enforce that target columns are numeric - a TypeError is raised at runtime otherwise.
Missing Values
| Step type | Description | Column types | params |
|---|---|---|---|
| impute_mean | Replace nulls with the column mean. | Numeric | - |
| impute_median | Replace nulls with the column median. | Numeric | - |
| impute_mode | Replace nulls with the most frequent value. | Any | - |
| drop_missing | Drop rows where any of the specified columns are null. | Any | - |
Outliers
| Step type | Description | Column types | params |
|---|---|---|---|
| clip_iqr | Clip values to [Q1 − 1.5×IQR, Q3 + 1.5×IQR]. | Numeric | - |
| clip_zscore | Clip values beyond N standard deviations from the mean. | Numeric | threshold (float, default 3.0) |
{ "type": "clip_zscore", "scope": { "columns": ["price"] }, "params": { "threshold": 2.5 } }
Scaling
| Step type | Description | Column types | params |
|---|---|---|---|
| standard_scaler | Standardise to zero mean and unit variance. Skips columns with std = 0. | Numeric | - |
| minmax_scaler | Scale to [0, 1]. Skips columns where max = min. | Numeric | - |
| robust_scaler | Scale using median and IQR - robust to outliers. Skips columns with IQR = 0. | Numeric | - |
Encoding
| Step type | Description | Column types | params |
|---|---|---|---|
| onehot | One-hot encode each category as a new Int8 column named col__value. Drops the original column. | String / Categorical | - |
| ordinal | Map categories to integers. Auto-sorts alphabetically if no order is given. | String / Categorical | order (dict: column -> ordered list of values) |
{
"type": "ordinal",
"scope": { "columns": ["priority", "size"] },
"params": {
"order": {
"priority": ["low", "medium", "high"],
"size": ["S", "M", "L", "XL"]
}
}
}
Structure
| Step type | Description | params |
|---|---|---|
| select_columns | Keep only the specified columns, drop all others. | - (uses scope.columns) |
| drop_columns | Drop the specified columns. | - (uses scope.columns) |
| rename_columns | Rename columns using a mapping. | mapping (dict: old_name -> new_name) |
| cast_dtype | Cast columns to target dtypes. | casts (dict: column -> dtype string) |
| deduplicate | Remove exact duplicate rows. If columns is set, deduplicates on that subset only. | - (uses scope.columns as subset) |
{ "type": "rename_columns", "scope": { "columns": [] }, "params": { "mapping": { "old_name": "new_name", "qty": "quantity" } } }
{
"type": "cast_dtype",
"scope": { "columns": [] },
"params": {
"casts": {
"price": "float64",
"quantity": "int32",
"is_active": "bool",
"created_at": "datetime"
}
}
}
Supported dtype strings: int8 · int16 · int32 · int64 · float32 · float64 · str · bool · date · datetime
Custom Python
| Step type | Description | params |
|---|---|---|
| custom_python | Run a sandboxed Python function (via RestrictedPython). The function receives a pl.DataFrame and must return a pl.DataFrame. | code (string, Python source) · func (string, function name, default transform) |
{
"type": "custom_python",
"scope": { "columns": [] },
"params": {
"func": "transform",
"code": "def transform(df):
return df.with_columns((pl.col('revenue') / pl.col('units')).alias('avg_price'))"
}
}
pl (Polars) is injected automatically. For untrusted code, use the Docker sandbox backend instead.
Step conflict warnings
The pipeline automatically detects and warns about common mistakes before execution:
| Pattern | Warning |
|---|---|
impute_* followed by drop_missing on the same column | drop_missing is redundant - column was already imputed |
clip_iqr followed by clip_zscore on the same column | overlapping outlier clipping steps detected |
standard_scaler / minmax_scaler / robust_scaler without a prior impute on the same column | scaler may produce NaN - add impute before scaling |
Semantic Types
Full reference for all types returned by dp classify.
| Semantic Type | Description | Typical columns |
|---|---|---|
| identifier | Unique or near-unique key - not a model feature | id, uuid, user_id |
| numerical_continuous | Real-valued measurement or financial amount | price, weight, temperature |
| numerical_discrete | Whole-number count or frequency | quantity, num_orders, visits |
| categorical_nominal | Unordered categories with no numeric meaning | category, status, country |
| categorical_ordinal | Ordered categories (low -> medium -> high) | priority, severity, grade |
| text_freeform | Unstructured natural language text | description, comment, review |
| text_structured | Formatted string following a known pattern | email, phone, url, zip |
| datetime | Date, time, or timestamp | created_at, born, expires_at |
| boolean | Binary flag or indicator | is_active, has_verified, deleted |
| geospatial | Geographic coordinate or location field | latitude, longitude, city |
| embedding | Dense vector representation | embedding, vec |
| target_label | Supervised learning target or ground truth | target, label, y |
| unknown | Could not be classified with sufficient confidence | - (use --overrides to pin manually) |