From contracts to trusted data
Without clear agreements, data pipelines rely on tribal knowledge and manual checks.
Error-prone, slow feedback, and little governance.
A thin wrapper around Spark that enforces contracts and records lineage.
from open_data_contract_standard.model import OpenDataContractStandard
contract = OpenDataContractStandard(
version="0.1.0",
kind="DataContract",
apiVersion="3.0.2",
id="test.orders",
name="Orders",
description=Description(usage="Orders facts"),
schema=[
SchemaObject(
name="orders",
properties=[
SchemaProperty(name="order_id", physicalType="bigint", required=True),
SchemaProperty(name="customer_id", physicalType="bigint", required=True),
SchemaProperty(name="order_ts", physicalType="timestamp", required=True),
SchemaProperty(name="amount", physicalType="double", required=True),
SchemaProperty(
name="currency",
physicalType="string",
required=True,
quality=[DataQuality(rule="enum", mustBe=["EUR", "USD"])],
),
],
)
],
)
{
"version": "1.1.0", "kind": "DataContract", "apiVersion": "3.0.2",
"id": "orders",
"name": "Orders",
"description": {"usage": "Sample orders contract"},
"status": "active",
"servers": [ { "server": "local", "type": "filesystem", "path": "data/orders", "format": "json" } ],
"schema": [
{
"name": "orders",
"properties": [
{"name": "order_id", "physicalType": "integer", "required": true, "unique": true},
{"name": "customer_id", "physicalType": "integer", "required": true},
{"name": "order_ts", "physicalType": "string", "required": true},
{
"name": "amount", "physicalType": "double", "required": true,
"quality": [ { "mustBeGreaterThan": 0 } ]
},
{"name": "currency", "physicalType": "string", "required": true}
]
}
]
}
orders_df, status = read_with_contract(
spark,
contract_id="orders",
contract_service=LocalContractServiceClient(store),
governance_service=governance,
dataset_locator=ContractVersionLocator(dataset_version="latest")
)
{
"status": "fail",
"violations": [
{"row": 42, "field": "amount", "message": "amount must be > 0"}
]
}
df = spark.read.json("orders/latest")
errors = validate_schema(df)
if errors:
raise ValueError(errors)
enriched = orders_df.join(customers_df, "customer_id")\
.withColumn("total", orders_df.amount * 1.2)
[
{"id": "1", "total": 12.0},
{"id": "2", "total": -6.0}
]
Negative totals will trigger contract checks later.
result, status = write_with_contract(
enriched,
contract_id="orders",
contract_service=LocalContractServiceClient(store),
governance_service=governance,
dataset_locator=ContractVersionLocator(dataset_version="latest")
)
{
"metrics": {"row_count": 2, "negative_total": 1},
"status": "block"
}
# manual
row_count = enriched.count()
negatives = enriched.filter("total < 0").count()
status = attach_failed_expectations(
contract,
status,
)
[
{
"expectation": "amount > 0",
"examples": [{"id": "2", "amount": -5.0}]
}
]
records.append(DatasetRecord(
name="orders_enriched",
version=1,
status=status.status,
metrics=result.metrics
))
save_records(records)
[
{"version": 1, "row_count": 2, "status": "fail"}
]
# v1.0.0
{"fields": [{"name": "amount", "type": "double"}]}
# v1.1.0
{"fields": [{"name": "amount", "type": "double", "nullable": true}]}
pip install dc43 → build your first contract today.