Metadata-Version: 2.4
Name: pytest-kafka-contract
Version: 0.1.0
Summary: A pytest plugin and CLI for validating Kafka JSON and Avro messages against contracts.
Project-URL: Homepage, https://pypi.org/project/pytest-kafka-contract/
Project-URL: Repository, https://github.com/YOUR_USERNAME/pytest-kafka-contract
Project-URL: Issues, https://github.com/YOUR_USERNAME/pytest-kafka-contract/issues
Project-URL: Changelog, https://github.com/YOUR_USERNAME/pytest-kafka-contract/blob/main/CHANGELOG.md
Author: Dharsan Guruparan
License-Expression: MIT
License-File: LICENSE
Keywords: avro,contract-testing,event-driven,integration-testing,kafka,pytest,qa,schema-registry,sdet
Classifier: Development Status :: 3 - Alpha
Classifier: Framework :: Pytest
Classifier: Intended Audience :: Developers
Classifier: License :: OSI Approved :: MIT License
Classifier: Operating System :: OS Independent
Classifier: Programming Language :: Python :: 3
Classifier: Programming Language :: Python :: 3.10
Classifier: Programming Language :: Python :: 3.11
Classifier: Programming Language :: Python :: 3.12
Classifier: Programming Language :: Python :: 3.13
Classifier: Topic :: Software Development :: Quality Assurance
Classifier: Topic :: Software Development :: Testing
Requires-Python: >=3.10
Requires-Dist: jsonschema>=4.0.0
Requires-Dist: pydantic>=2.0.0
Requires-Dist: pytest>=8.0.0
Requires-Dist: pyyaml>=6.0.0
Requires-Dist: rich>=13.0.0
Requires-Dist: typer>=0.12.0
Provides-Extra: all
Requires-Dist: confluent-kafka>=2.3.0; extra == 'all'
Requires-Dist: fastavro>=1.9.0; extra == 'all'
Requires-Dist: httpx>=0.27.0; extra == 'all'
Provides-Extra: avro
Requires-Dist: fastavro>=1.9.0; extra == 'avro'
Provides-Extra: dev
Requires-Dist: build>=1.2.0; extra == 'dev'
Requires-Dist: confluent-kafka>=2.3.0; extra == 'dev'
Requires-Dist: fastavro>=1.9.0; extra == 'dev'
Requires-Dist: httpx>=0.27.0; extra == 'dev'
Requires-Dist: mypy>=1.10.0; extra == 'dev'
Requires-Dist: pydantic>=2.0.0; extra == 'dev'
Requires-Dist: pytest-cov>=5.0.0; extra == 'dev'
Requires-Dist: pytest>=8.0.0; extra == 'dev'
Requires-Dist: ruff>=0.6.0; extra == 'dev'
Requires-Dist: twine>=5.0.0; extra == 'dev'
Requires-Dist: types-pyyaml>=6.0; extra == 'dev'
Provides-Extra: kafka
Requires-Dist: confluent-kafka>=2.3.0; extra == 'kafka'
Provides-Extra: registry
Requires-Dist: httpx>=0.27.0; extra == 'registry'
Description-Content-Type: text/markdown

# pytest-kafka-contract

`pytest-kafka-contract` is a pytest plugin and CLI for validating Kafka event payloads against JSON/YAML contracts, Avro schemas, Schema Registry subjects, and real Kafka messages.

It is lightweight pytest-native contract checking for QA automation, SDET, backend, and Kafka teams. It complements Schema Registry by making contract checks easy to run in unit tests, CI jobs, and local developer workflows.

## Why this exists

Kafka messages can drift silently: producers rename fields, remove required data, change types, or publish payloads that consumers cannot handle. This package catches those failures in normal pytest runs before they reach production.

## Features

- JSON payload validation with YAML contracts
- Avro record validation with `.avsc` schemas
- Confluent Schema Registry subject checks
- Real Kafka JSON message validation
- Real Kafka Avro message validation with Confluent wire format
- `kafka_contract` pytest fixture
- CLI commands for local and CI usage
- Markdown and JSON reports

## Install

Basic JSON support:

```bash
pip install pytest-kafka-contract
```

All features:

```bash
pip install "pytest-kafka-contract[all]"
```

Individual extras:

```bash
pip install "pytest-kafka-contract[avro]"
pip install "pytest-kafka-contract[registry]"
pip install "pytest-kafka-contract[kafka]"
```

The base install supports JSON contracts and does not import Kafka, Avro, or Schema Registry clients at import time.

## Beginner Flow

Step 1: Install:

```bash
pip install "pytest-kafka-contract[all]"
```

Step 2: Generate starter files:

```bash
pytest-kafka-contract init
```

Step 3: Validate sample JSON:

```bash
pytest-kafka-contract validate-file contracts/order-created.yaml samples/order-created.json
```

Step 4: Validate sample Avro record:

```bash
pytest-kafka-contract avro-validate-file schemas/order-created.avsc samples/order-created-avro.json
```

Step 5: Write pytest test:

```python
def test_order_created(kafka_contract):
    result = kafka_contract.validate_payload(
        payload={
            "event_id": "evt_1",
            "event_type": "OrderCreated",
            "order": {"order_id": "ord_1", "total": 20.0},
        },
        contract_path="contracts/order-created.yaml",
    )

    assert result.passed, result.issues
```

Step 6: Run pytest:

```bash
pytest -q
```

Step 7: Optional Schema Registry check:

```bash
pytest-kafka-contract registry-check \
  --registry-url http://localhost:8081 \
  --subject orders.created-value \
  --schema schemas/order-created.avsc
```

Step 8: Optional real Kafka check:

```bash
pytest-kafka-contract kafka-validate-json \
  --bootstrap-servers localhost:19092 \
  --topic orders.created \
  --contract contracts/order-created.yaml \
  --auto-offset-reset earliest
```

## Quick Start: JSON Contract

Create folders:

```bash
mkdir -p contracts samples tests
```

Create `contracts/order-created.yaml`:

```yaml
version: 1
name: order-created-v1
topic: orders.created
message:
  type: object
  required:
    - event_id
    - event_type
    - order
  properties:
    event_id:
      type: string
      nullable: false
    event_type:
      type: string
      const: OrderCreated
      nullable: false
    order:
      type: object
      nullable: false
      required:
        - order_id
        - total
      properties:
        order_id:
          type: string
          nullable: false
        total:
          type: number
          nullable: false
rules:
  allow_extra_fields: false
```

Create `samples/order-created.json`:

```json
{
  "event_id": "evt_1",
  "event_type": "OrderCreated",
  "order": {
    "order_id": "ord_1",
    "total": 20.0
  }
}
```

Create `tests/test_order_created.py`:

```python
import json


def test_order_created_json(kafka_contract):
    payload = json.loads(open("samples/order-created.json", encoding="utf-8").read())
    result = kafka_contract.validate_payload(
        payload=payload,
        contract_path="contracts/order-created.yaml",
    )
    assert result.passed, result.issues
```

Run:

```bash
pytest -q
```

CLI:

```bash
pytest-kafka-contract validate-file contracts/order-created.yaml samples/order-created.json
```

## Quick Start: Avro Schema

Create `schemas/order-created.avsc`:

```json
{
  "type": "record",
  "name": "OrderCreated",
  "namespace": "com.example.orders",
  "fields": [
    { "name": "event_id", "type": "string" },
    { "name": "order_id", "type": "string" },
    { "name": "total", "type": "double" },
    {
      "name": "currency",
      "type": {
        "type": "enum",
        "name": "Currency",
        "symbols": ["USD", "CAD", "EUR"]
      },
      "default": "USD"
    }
  ]
}
```

Create `samples/order-created-avro.json`:

```json
{
  "event_id": "evt_1",
  "order_id": "ord_1",
  "total": 20.0,
  "currency": "USD"
}
```

Use the fixture:

```python
def test_order_created_avro(kafka_contract):
    result = kafka_contract.validate_avro_record(
        record={
            "event_id": "evt_1",
            "order_id": "ord_1",
            "total": 20.0,
            "currency": "USD",
        },
        schema_path="schemas/order-created.avsc",
    )

    assert result.passed, result.issues
```

CLI:

```bash
pytest-kafka-contract avro-validate-file schemas/order-created.avsc samples/order-created-avro.json
```

## Schema Registry Check

Schema Registry already manages schemas. This package checks it from pytest or the CLI, which is useful for CI and QA automation.

```bash
pytest-kafka-contract registry-check \
  --registry-url http://localhost:8081 \
  --subject orders.created-value \
  --schema schemas/order-created.avsc \
  --compatibility BACKWARD
```

```python
def test_registry_subject(kafka_contract):
    result = kafka_contract.validate_schema_registry_subject(
        registry_url="http://localhost:8081",
        subject="orders.created-value",
        schema_path="schemas/order-created.avsc",
        compatibility="BACKWARD",
    )
    assert result.passed, result.issues
```

## Real Kafka JSON Validation

Requires Kafka running and install with `[kafka]` or `[all]`.

```bash
pytest-kafka-contract kafka-validate-json \
  --bootstrap-servers localhost:19092 \
  --topic orders.created \
  --contract contracts/order-created.yaml \
  --timeout-ms 10000 \
  --auto-offset-reset earliest
```

```python
def test_latest_json_message(kafka_contract):
    result = kafka_contract.validate_latest_json(
        topic="orders.created",
        contract_path="contracts/order-created.yaml",
        bootstrap_servers="localhost:19092",
        timeout_ms=10000,
        auto_offset_reset="earliest",
    )
    assert result.passed, result.issues
```

## Real Kafka Avro Validation

Requires Kafka, Schema Registry, and install with `[all]` or `[kafka]`, `[avro]`, and `[registry]`.

```bash
pytest-kafka-contract kafka-validate-avro \
  --bootstrap-servers localhost:19092 \
  --registry-url http://localhost:8081 \
  --topic orders.created \
  --subject orders.created-value \
  --timeout-ms 10000 \
  --auto-offset-reset earliest
```

```python
def test_latest_avro_message(kafka_contract):
    result = kafka_contract.validate_latest_avro(
        topic="orders.created",
        registry_url="http://localhost:8081",
        subject="orders.created-value",
        bootstrap_servers="localhost:19092",
        timeout_ms=10000,
        auto_offset_reset="earliest",
    )
    assert result.passed, result.issues
```

## CLI Reference

```bash
pytest-kafka-contract --help
pytest-kafka-contract init
pytest-kafka-contract validate-file contracts/order-created.yaml samples/order-created.json
pytest-kafka-contract avro-validate-file schemas/order-created.avsc samples/order-created-avro.json
pytest-kafka-contract registry-check --registry-url http://localhost:8081 --subject orders.created-value --schema schemas/order-created.avsc --compatibility BACKWARD
pytest-kafka-contract kafka-validate-json --bootstrap-servers localhost:19092 --topic orders.created --contract contracts/order-created.yaml --timeout-ms 10000 --auto-offset-reset earliest
pytest-kafka-contract kafka-validate-avro --bootstrap-servers localhost:19092 --registry-url http://localhost:8081 --topic orders.created --subject orders.created-value --timeout-ms 10000 --auto-offset-reset earliest
```

`kafka-contract` is also available as a compatibility alias.

## Pytest Fixture API

```python
kafka_contract.load_contract("contracts/order-created.yaml")
kafka_contract.validate_payload(payload, contract_path="contracts/order-created.yaml")
kafka_contract.validate_avro_record(record, schema_path="schemas/order-created.avsc")
kafka_contract.validate_schema_registry_subject(registry_url, subject, schema_path, compatibility="BACKWARD")
kafka_contract.validate_latest_json(topic, contract_path, bootstrap_servers="localhost:19092", auto_offset_reset="earliest")
kafka_contract.validate_latest_avro(topic, registry_url, subject, bootstrap_servers="localhost:19092", auto_offset_reset="earliest")
```

## Pytest Options

```bash
pytest --kafka-contract contracts/order-created.yaml
pytest --kafka-bootstrap-servers localhost:9092
pytest --kafka-timeout-ms 10000
pytest --kafka-strict
pytest --kafka-contract-report .reports/kafka-contract.md
pytest --kafka-contract-json-report .reports/kafka-contract.json
pytest --schema-registry-url http://localhost:8081
pytest --kafka-avro-subject orders.created-value
pytest --kafka-format json
```

## Reports

Markdown and JSON reports include total checks, passed checks, failed checks, format, topic, subject, schema id, issues, and metadata.

```bash
pytest --kafka-contract-report .reports/kafka-contract.md \
  --kafka-contract-json-report .reports/kafka-contract.json
```

Example Markdown:

```md
# Kafka Contract Report

| Total | Passed | Failed |
|---:|---:|---:|
| 3 | 2 | 1 |

## Failed Checks

### orders.created

- Format: avro
- Subject: orders.created-value
- Schema ID: 12
- Issue: AVRO_RECORD_INVALID at $.total
```

## Local Integration Testing

Docker integration tests are optional and are not run by default.
The compose stack exposes Kafka on `localhost:19092`, Schema Registry on `localhost:8081`,
and Redpanda admin on `${PKCT_REDPANDA_ADMIN_PORT:-19644}` to avoid common local `9644`
conflicts.

```bash
./scripts/run-integration.sh
```

Equivalent manual commands:

```bash
docker compose -f docker-compose.integration.yml up -d
PKCT_RUN_INTEGRATION=1 \
PKCT_BOOTSTRAP_SERVERS=localhost:19092 \
PKCT_SCHEMA_REGISTRY_URL=http://localhost:8081 \
pytest tests/integration -q -s
docker compose -f docker-compose.integration.yml down -v
```

## CI Example

```yaml
- run: python -m pip install -e ".[dev]"
- run: ruff check .
- run: mypy src
- run: pytest --cov=pytest_kafka_contract --cov-report=term-missing
- run: python -m build
- run: python -m twine check dist/*
```

## Limitations

- Alpha package.
- JSON contract syntax is intentionally small.
- Avro support expects standard `.avsc` schemas.
- Kafka Avro validation expects Confluent wire format.
- Schema Registry auth is not included in the MVP.
- This does not replace Pact or Confluent governance tools.

## Roadmap

- Protobuf support
- Schema Registry auth
- Batch topic validation
- Compatibility diff output
- Kafka key and header validation

## License

MIT.
