dc43 Pipeline Demo

From contracts to trusted data

Why Data Contracts?

Without clear agreements, data pipelines rely on tribal knowledge and manual checks.

Manual Step 1: Infer Schema

  • Developers inspect files to guess structure
  • Ad-hoc scripts enforce types
  • Evolution requires email coordination

Manual Step 2: Validate Inputs

  • Custom validators scattered across jobs
  • Late discovery of wrong or missing fields

Manual Step 3: Compute Metrics

  • Separate jobs count rows and nulls
  • Hard to compare across runs

Manual Step 4: Track Versions

  • Spreadsheet or wiki for dataset history
  • No link between code and documentation

Manual Step 5: Communicate Changes

  • Emails and meetings to share updates
  • Consumers discover breaking changes too late

Manual Pipeline Pain

Error-prone, slow feedback, and little governance.

Enter dc43

A thin wrapper around Spark that enforces contracts and records lineage.

1. Define Data Contract

from open_data_contract_standard.model import OpenDataContractStandard

contract = OpenDataContractStandard(
  version="0.1.0",
  kind="DataContract",
  apiVersion="3.0.2",
  id="test.orders",
  name="Orders",
  description=Description(usage="Orders facts"),
  schema=[
    SchemaObject(
      name="orders",
      properties=[
        SchemaProperty(name="order_id", physicalType="bigint", required=True),
        SchemaProperty(name="customer_id", physicalType="bigint", required=True),
        SchemaProperty(name="order_ts", physicalType="timestamp", required=True),
        SchemaProperty(name="amount", physicalType="double", required=True),
        SchemaProperty(
          name="currency",
          physicalType="string",
          required=True,
          quality=[DataQuality(rule="enum", mustBe=["EUR", "USD"])],
        ),
      ],
    )
  ],
)

Contract JSON

{
  "version": "1.1.0",  "kind": "DataContract", "apiVersion": "3.0.2",
  "id": "orders",
  "name": "Orders",
  "description": {"usage": "Sample orders contract"},
  "status": "active",
  "servers": [ { "server": "local", "type": "filesystem", "path": "data/orders", "format": "json" } ],
  "schema": [
    {
      "name": "orders",
      "properties": [
        {"name": "order_id", "physicalType": "integer", "required": true, "unique": true},
        {"name": "customer_id", "physicalType": "integer", "required": true},
        {"name": "order_ts", "physicalType": "string", "required": true},
        {
          "name": "amount", "physicalType": "double", "required": true,
          "quality": [ { "mustBeGreaterThan": 0 } ]
        },
        {"name": "currency", "physicalType": "string", "required": true}
      ]
    }
  ]
}

2. Read with Governance

orders_df, status = read_with_governance(
    spark,
    GovernanceSparkReadRequest(
        context={
            "contract": {
                "contract_id": "orders",
                "contract_version": "0.1.0",
            }
        },
        dataset_locator=ContractVersionLocator(dataset_version="latest"),
    ),
    governance_service=governance,
)

Read Status

{
  "status": "fail",
  "violations": [
    {"row": 42, "field": "amount", "message": "amount must be > 0"}
  ]
}

Manual Alternative

df = spark.read.json("orders/latest")
errors = validate_schema(df)
if errors:
    raise ValueError(errors)

3. Transform with Spark

enriched = orders_df.join(customers_df, "customer_id")\
    .withColumn("total", orders_df.amount * 1.2)

Transformation Output

[
  {"id": "1", "total": 12.0},
  {"id": "2", "total": -6.0}
]

Negative totals will trigger contract checks later.

4. Write with Contract

result, status = write_with_contract(
    enriched,
    contract_id="orders",
    contract_service=LocalContractServiceClient(store),
    governance_service=governance,
    dataset_locator=ContractVersionLocator(dataset_version="latest")
)

Write Result

{
  "metrics": {"row_count": 2, "negative_total": 1},
  "status": "block"
}

Metrics vs Manual

# manual
row_count = enriched.count()
negatives = enriched.filter("total < 0").count()

5. Inspect Violations

status = attach_failed_expectations(
    contract,
    status,
)

Violation Report

[
  {
    "expectation": "amount > 0",
    "examples": [{"id": "2", "amount": -5.0}]
  }
]

6. Track Dataset Versions

records = load_records()
latest = records[-1]
print(latest.dataset_name, latest.dataset_version, latest.status)

Version History

[
  {"version": 1, "row_count": 2, "status": "fail"}
]

Logic Flow

Read-Write

Read

Write

Pipeline Comparison

Manual

  • Separate scripts for validation and metrics
  • Manual tracking of versions
  • Inconsistent rules

dc43

  • Contracts enforce schema and rules
  • Metrics captured on write
  • History recorded automatically

With vs Without Contracts

  • Without: implicit schemas, late errors, manual docs
  • With: versioned definitions, early validation, consistent governance

Benefits for Data Engineers

  • Less boilerplate Spark code
  • Early detection of issues
  • Automatic metrics for monitoring

Benefits for Governance

  • Traceable changes across versions
  • Clear contracts between producers and consumers
  • Audit-friendly metrics and violations

Contract Evolution

# v1.0.0
{"fields": [{"name": "amount", "type": "double"}]}
# v1.1.0
{"fields": [{"name": "amount", "type": "double", "nullable": true}]}

Summary

  • Contracts define, validate, and document data
  • dc43 automates metrics and versioning
  • Manual steps shrink, reliability grows

Get Started

pip install dc43 → build your first contract today.