How-To Guide
From export analysis to signed governance artifacts. Six target platforms, airgapped operation, full audit trails.
pip install cepheus-benza
cepheus-benza --version
[analyze]
path = "./exports/"
output = "./analysis.pdf"
[convert]
path = "./exports/"
license = "./license.pdf"
output = "./output/"
target = "dbt"
dialect = "snowflake"
Every command reads from spec.toml. CLI flags override spec values when provided. Relative paths resolve from the spec file's directory.
Scan PowerCenter XML exports to assess automation potential before committing to a conversion.
[analyze]
path = "./exports/"
output = "./analysis.pdf"
summary_only = false
no_recursion = false
Submit analysis_report.pdf to benza@cepheus.in. The PDF embeds cryptographic fingerprints of your exports. Cepheus replies with a license file.
Why checksums? The license is tied to your export files as they existed when analyzed. Modified files won't match and conversion is blocked — preventing accidental conversion of untested exports.
[verify_license]
path = "./exports/"
license = "./license.pdf"
VALID repo_export.xml (sha256: a1b2c3...)
VALID archive_export.xml (sha256: f6e5d4...)
MISMATCH = file modified since analysis (re-run analyze). UNLICENSED = not in this license.
[convert]
path = "./exports/"
license = "./license.pdf"
output = "./output/"
target = "dbt" # dbt | pyspark | snowpark | glue | adf | dataform
dialect = "snowflake" # snowflake | bigquery | redshift | databricks | synapse | generic
orchestrator = "airflow" # python | airflow | dagster | prefect (dbt only)
[convert]
target = "pyspark"
workspace_path = "/Workspace/benza"
[convert]
target = "snowpark"
warehouse = "COMPUTE_WH"
[convert]
target = "glue"
glue_database = "benza_db"
[convert]
rules_dir = "./my_custom_rules"
See Custom Rules for how to write rule classes.
Structure varies by target. All produce a reports/ directory with signed governance artifacts.
output/
├── dbt_project.yml
├── models/ <model>.sql + sources.yml + .yml docs
├── macros/ <mapplet>.sql
├── snapshots/ SCD Type 2 snapshot configs
├── orchestration/ Airflow / Dagster / Prefect / Python
├── validation/ Reconciliation queries
└── reports/ audit_trail.json + .pdf | equivalence + .pdf | risk_scores.json
output/
├── jobs/ PySpark job scripts
├── notebooks/ Databricks notebook cells
├── orchestration/ Databricks Workflow JSON
└── reports/
output/
├── procedures/ Snowpark stored procedures
├── orchestration/ Snowflake Task DAG (CREATE TASK)
└── reports/
output/
├── jobs/ Glue PySpark + GlueContext
├── orchestration/ Step Functions | Glue Workflow | Airflow
└── reports/
output/
├── dataflows/ Mapping Data Flow JSON
├── pipelines/ ADF Pipeline JSON
├── datasets/ + linkedServices/ + arm_template.json
└── reports/
output/
├── definitions/ SQLX models
├── includes/ JavaScript macros
├── workflow_settings.yaml
└── reports/
About the PDFs: Every PDF is PDF/A-3 with JSON data embedded as an attachment. One file for humans, one for machines. HMAC integrity metadata detects tampering.
| Level | Meaning |
|---|---|
| exact_equivalent | Functionally identical |
| semantic_equivalent | Same result, different syntax |
| behavioral_equivalent | Same in normal cases; edge cases may differ |
| approximation | Best-effort. Manual review required |
| stub | Not translated. Manual implementation required |
Local Flask dashboard for reviewing conversion results.
[serve]
output_dir = "./output/"
port = 8080
host = "127.0.0.1"
From the Workbench, each mapping can be Approved or Flagged (with notes). Decisions are recorded in reports/approvals.jsonl — append-only, immutable, timestamped. Part of the governance evidence trail.
Optional. Runs validation queries against source and target databases, compares results.
[reconcile]
output_dir = "./output/"
source_conn = "postgresql://user:pass@source-host:5432/db"
target_conn = "snowflake://user:pass@account/db/schema?warehouse=wh"
source_schema = "operational"
target_schema = "analytics"
tolerance = 0.0
timeout = 120
| Warehouse | Package |
|---|---|
| Snowflake | snowflake-sqlalchemy |
| BigQuery | sqlalchemy-bigquery |
| Redshift | sqlalchemy-redshift |
| PostgreSQL | psycopg2-binary |
| Databricks | databricks-sql-connector |
| Status | Meaning |
|---|---|
| green | All checks passed within tolerance |
| yellow | Passed within tolerance but not exact |
| red | One or more checks failed |
| error | Query execution error |
Produced on every conversion run. Cannot be disabled.
[audit_report]
output_dir = "./output/"
export = "" # leave blank for all, or set to a mapping name
printable = false
| Platform | Target value | Output | Orchestration |
|---|---|---|---|
| dbt | dbt | SQL models, macros, snapshots | Airflow, Dagster, Prefect, Python |
| PySpark | pyspark | Job scripts, notebooks | Databricks Workflows |
| Snowpark | snowpark | Stored procedures | Snowflake Tasks |
| AWS Glue | glue | Glue PySpark scripts | Step Functions, Glue Workflows, Airflow |
| ADF | adf | Data Flows, Pipelines, ARM | ADF Pipelines |
| Dataform | dataform | SQLX, JS includes | Dataform schedules |
All share the same translation engine, expression grammar, and SCD detection. Governance artifacts are identical across targets.
| Dialect | Value | Highlights |
|---|---|---|
| Generic ANSI | generic | Default |
| Snowflake | snowflake | FLATTEN, TO_TIMESTAMP_NTZ |
| BigQuery | bigquery | DATE_TRUNC, SAFE_DIVIDE |
| Redshift | redshift | GETDATE(), DATEADD |
| Databricks | databricks | DATE_FORMAT, DATEDIFF |
| Synapse | synapse | CONVERT, DATEPART |
Other targets (PySpark, Snowpark, Glue, ADF, Dataform) generate platform-native code directly — no dialect transpilation.
| Orchestrator | Value | Output |
|---|---|---|
| Python | python | Plain Python script |
| Airflow | airflow | DAG with PythonOperator |
| Dagster | dagster | Job with @op definitions |
| Prefect | prefect | Flow with @task definitions |
| Target | Orchestration |
|---|---|
| PySpark | Databricks Workflow JSON |
| Snowpark | Snowflake Task DAG |
| Glue | Step Functions + Glue Workflow + Airflow DAG |
| ADF | ADF Pipeline JSON |
| Dataform | workflow_settings.yaml |
from cepheus_benza.core.rules.base import TranslationRule
class MyLookupRule(TranslationRule):
priority = 50
def applies_to(self, node) -> bool:
return node.transformation_type == "Lookup" and "CUSTOM_" in node.name
def translate(self, node, context):
return node_output
Place rule files in a directory and set rules_dir in [convert]. Higher priority takes precedence over built-in rules.
Override spec.toml values. Useful for credentials you don't want in config files.
| Variable | Spec equivalent |
|---|---|
CEPHEUS_BENZA_SOURCE_CONN | [reconcile] source_conn |
CEPHEUS_BENZA_TARGET_CONN | [reconcile] target_conn |
CEPHEUS_BENZA_OUTPUT_DIRECTORY | [convert] output |
CEPHEUS_BENZA_LOGGING_LEVEL | — |
CEPHEUS_BENZA_SMTP_PASSWORD | [notification] smtp_password |
Checksums don't match. File was modified after analysis or wrong license. Re-run analyze and request an updated license.
Check the license path in spec.toml. Ensure the file exists and is readable.
Not a valid PowerCenter export. Verify: exported from Repository Manager, UTF-8 or Latin-1 encoding, file not truncated.
Some types produce no output without a connected Target. Check audit trail for stub confidence or disconnected node warnings.
pip install sqlalchemy <driver-for-your-warehouse>
timeout in spec.toml to increase per-query limitVerify output_dir in [serve] matches output in [convert], and reports/audit_trail.json exists.