Metadata-Version: 2.4
Name: pytest-kafka-contract
Version: 0.1.2
Summary: A pytest plugin and CLI for validating Kafka JSON and Avro messages against contracts.
Project-URL: Homepage, https://pypi.org/project/pytest-kafka-contract/
Project-URL: Repository, https://github.com/YOUR_USERNAME/pytest-kafka-contract
Project-URL: Issues, https://github.com/YOUR_USERNAME/pytest-kafka-contract/issues
Project-URL: Changelog, https://github.com/YOUR_USERNAME/pytest-kafka-contract/blob/main/CHANGELOG.md
Author: Dharsan Guruparan
License-Expression: MIT
License-File: LICENSE
Keywords: avro,contract-testing,event-driven,integration-testing,kafka,pytest,qa,schema-registry,sdet
Classifier: Development Status :: 3 - Alpha
Classifier: Framework :: Pytest
Classifier: Intended Audience :: Developers
Classifier: License :: OSI Approved :: MIT License
Classifier: Operating System :: OS Independent
Classifier: Programming Language :: Python :: 3
Classifier: Programming Language :: Python :: 3.10
Classifier: Programming Language :: Python :: 3.11
Classifier: Programming Language :: Python :: 3.12
Classifier: Programming Language :: Python :: 3.13
Classifier: Topic :: Software Development :: Quality Assurance
Classifier: Topic :: Software Development :: Testing
Requires-Python: >=3.10
Requires-Dist: jsonschema>=4.0.0
Requires-Dist: pydantic>=2.0.0
Requires-Dist: pytest>=8.0.0
Requires-Dist: pyyaml>=6.0.0
Requires-Dist: rich>=13.0.0
Requires-Dist: typer>=0.12.0
Provides-Extra: all
Requires-Dist: confluent-kafka>=2.3.0; extra == 'all'
Requires-Dist: fastavro>=1.9.0; extra == 'all'
Requires-Dist: httpx>=0.27.0; extra == 'all'
Provides-Extra: avro
Requires-Dist: fastavro>=1.9.0; extra == 'avro'
Provides-Extra: dev
Requires-Dist: build>=1.2.0; extra == 'dev'
Requires-Dist: confluent-kafka>=2.3.0; extra == 'dev'
Requires-Dist: fastavro>=1.9.0; extra == 'dev'
Requires-Dist: httpx>=0.27.0; extra == 'dev'
Requires-Dist: mypy>=1.10.0; extra == 'dev'
Requires-Dist: pydantic>=2.0.0; extra == 'dev'
Requires-Dist: pytest-cov>=5.0.0; extra == 'dev'
Requires-Dist: pytest>=8.0.0; extra == 'dev'
Requires-Dist: ruff>=0.6.0; extra == 'dev'
Requires-Dist: twine>=5.0.0; extra == 'dev'
Requires-Dist: types-pyyaml>=6.0; extra == 'dev'
Provides-Extra: kafka
Requires-Dist: confluent-kafka>=2.3.0; extra == 'kafka'
Provides-Extra: registry
Requires-Dist: httpx>=0.27.0; extra == 'registry'
Description-Content-Type: text/markdown

# pytest-kafka-contract

Pytest plugin and CLI for contract testing Kafka event payloads.

`pytest-kafka-contract` helps QA and backend teams validate Kafka messages against explicit contracts before broken event changes reach consumers.

It supports:

- JSON payload validation against YAML contracts
- Avro record validation against `.avsc` schemas
- Confluent Schema Registry checks
- Real Kafka message validation
- Real Kafka Avro message decoding
- Pytest fixture API
- CLI validation commands
- Markdown and JSON reports

---

## Why

Kafka messages can break silently.

A producer can rename a field, remove a required value, change a number into a string, or publish a payload that no longer matches what downstream consumers expect.

This package catches issues like:

- Missing required fields
- Wrong field types
- Invalid constants
- Invalid enum values
- Unexpected extra fields
- Null values where null is not allowed
- Invalid Avro records
- Schema Registry subject issues
- Kafka messages that do not match their expected contract

The goal is simple:

> Make Kafka event contracts testable in normal pytest workflows.

---

## Install

Basic JSON contract validation:

```bash
pip install pytest-kafka-contract
```

Install everything, including Kafka, Avro, and Schema Registry support:

```bash
pip install "pytest-kafka-contract[all]"
```

Optional extras:

```bash
pip install "pytest-kafka-contract[avro]"
pip install "pytest-kafka-contract[kafka]"
pip install "pytest-kafka-contract[registry]"
```

---

## Quick Start: JSON Contract Validation

Create a contract file:

```bash
mkdir -p contracts
```

Create `contracts/order-created.yaml`:

```yaml
version: 1
name: order-created-v1
topic: orders.created

message:
  type: object
  required:
    - event_id
    - event_type
    - order
  properties:
    event_id:
      type: string
      nullable: false

    event_type:
      type: string
      const: OrderCreated
      nullable: false

    order:
      type: object
      required:
        - order_id
        - total
      properties:
        order_id:
          type: string

        total:
          type: number

rules:
  allow_extra_fields: false
```

Write a pytest test:

```python
def test_order_created_contract(kafka_contract):
    result = kafka_contract.validate_payload(
        payload={
            "event_id": "evt_1",
            "event_type": "OrderCreated",
            "order": {
                "order_id": "ord_1",
                "total": 20.0,
            },
        },
        contract_path="contracts/order-created.yaml",
    )

    assert result.passed, result.issues
```

Run:

```bash
pytest
```

---

## JSON Contract Failure Example

If the payload is wrong:

```python
def test_order_created_contract_fails(kafka_contract):
    result = kafka_contract.validate_payload(
        payload={
            "event_id": "evt_1",
            "event_type": "WrongEvent",
            "order": {
                "order_id": "ord_1",
                "total": "20.0",
            },
        },
        contract_path="contracts/order-created.yaml",
    )

    assert result.passed, result.issues
```

The result contains readable issues such as:

```txt
CONST_MISMATCH
TYPE_MISMATCH
```

---

## Avro Validation

Install Avro support:

```bash
pip install "pytest-kafka-contract[avro]"
```

Create `schemas/order-created.avsc`:

```json
{
  "type": "record",
  "name": "OrderCreated",
  "namespace": "com.example.orders",
  "fields": [
    {
      "name": "event_id",
      "type": "string"
    },
    {
      "name": "order_id",
      "type": "string"
    },
    {
      "name": "total",
      "type": "double"
    },
    {
      "name": "currency",
      "type": {
        "type": "enum",
        "name": "Currency",
        "symbols": ["USD", "CAD", "EUR"]
      },
      "default": "USD"
    }
  ]
}
```

Validate a Python dictionary against the Avro schema:

```python
def test_order_created_avro(kafka_contract):
    result = kafka_contract.validate_avro_record(
        record={
            "event_id": "evt_1",
            "order_id": "ord_1",
            "total": 20.0,
            "currency": "USD",
        },
        schema_path="schemas/order-created.avsc",
    )

    assert result.passed, result.issues
```

---

## Schema Registry Checks

Install Schema Registry support:

```bash
pip install "pytest-kafka-contract[registry]"
```

Validate that a subject exists and is compatible with a local schema:

```python
def test_schema_registry_subject(kafka_contract):
    result = kafka_contract.validate_schema_registry_subject(
        registry_url="http://localhost:8081",
        subject="orders.created-value",
        schema_path="schemas/order-created.avsc",
        compatibility="BACKWARD",
    )

    assert result.passed, result.issues
```

This can check:

- Registry reachability
- Subject existence
- Latest schema lookup
- Local schema comparison
- Compatibility result

---

## Real Kafka JSON Message Validation

Install Kafka support:

```bash
pip install "pytest-kafka-contract[kafka]"
```

Validate the latest JSON message from a Kafka topic:

```python
def test_latest_json_message(kafka_contract):
    result = kafka_contract.validate_latest_json(
        topic="orders.created",
        contract_path="contracts/order-created.yaml",
        bootstrap_servers="localhost:9092",
        timeout_ms=10000,
    )

    assert result.passed, result.issues
```

This flow:

```txt
consume Kafka message
decode JSON
validate against YAML contract
return detailed result
```

---

## Real Kafka Avro Message Validation

Install all optional dependencies:

```bash
pip install "pytest-kafka-contract[all]"
```

Validate the latest Avro message from Kafka using Schema Registry:

```python
def test_latest_avro_message(kafka_contract):
    result = kafka_contract.validate_latest_avro(
        topic="orders.created",
        registry_url="http://localhost:8081",
        subject="orders.created-value",
        bootstrap_servers="localhost:9092",
        timeout_ms=10000,
    )

    assert result.passed, result.issues
```

This flow:

```txt
consume Kafka message
extract Confluent schema ID
fetch schema from Schema Registry
decode Avro payload
validate decoded record
return detailed result
```

---

## Pytest Fixture API

The plugin provides a `kafka_contract` fixture.

```python
def test_with_fixture(kafka_contract):
    result = kafka_contract.validate_payload(
        payload={"event_type": "OrderCreated"},
        contract_path="contracts/order-created.yaml",
    )

    assert result.passed, result.issues
```

Supported fixture methods:

```python
kafka_contract.load_contract(contract_path)

kafka_contract.validate_payload(
    payload,
    contract_path,
)

kafka_contract.validate_avro_record(
    record,
    schema_path,
)

kafka_contract.validate_schema_registry_subject(
    registry_url,
    subject,
    schema_path,
    compatibility=None,
)

kafka_contract.validate_latest_json(
    topic,
    contract_path,
    bootstrap_servers="localhost:9092",
    timeout_ms=10000,
)

kafka_contract.validate_latest_avro(
    topic,
    registry_url,
    subject,
    bootstrap_servers="localhost:9092",
    timeout_ms=10000,
)
```

---

## CLI Usage

Initialize example contracts and samples:

```bash
kafka-contract init
```

Validate a JSON sample file against a YAML contract:

```bash
kafka-contract validate-file \
  contracts/order-created.yaml \
  samples/order-created.json
```

Validate a JSON sample file against an Avro schema:

```bash
kafka-contract avro-validate-file \
  schemas/order-created.avsc \
  samples/order-created-avro.json
```

Check Schema Registry:

```bash
kafka-contract registry-check \
  --registry-url http://localhost:8081 \
  --subject orders.created-value \
  --schema schemas/order-created.avsc \
  --compatibility BACKWARD
```

Validate the latest JSON message from Kafka:

```bash
kafka-contract kafka-validate-json \
  --bootstrap-servers localhost:9092 \
  --topic orders.created \
  --contract contracts/order-created.yaml \
  --timeout-ms 10000
```

Validate the latest Avro message from Kafka:

```bash
kafka-contract kafka-validate-avro \
  --bootstrap-servers localhost:9092 \
  --registry-url http://localhost:8081 \
  --topic orders.created \
  --subject orders.created-value \
  --timeout-ms 10000
```

Show help:

```bash
kafka-contract --help
```

---

## Pytest CLI Options

```bash
pytest \
  --kafka-contract contracts/order-created.yaml \
  --kafka-bootstrap-servers localhost:9092 \
  --kafka-timeout-ms 10000
```

Report options:

```bash
pytest \
  --kafka-contract-report .reports/kafka-contract.md \
  --kafka-contract-json-report .reports/kafka-contract.json
```

Available options:

```txt
--kafka-contract
--kafka-bootstrap-servers
--kafka-timeout-ms
--kafka-strict
--kafka-contract-report
--kafka-contract-json-report
--schema-registry-url
--kafka-avro-subject
--kafka-format
```

---

## Contract Rules

Supported JSON contract types:

```txt
string
number
integer
boolean
object
array
null
```

Supported validation rules:

```txt
required
properties
nullable
const
enum
format: datetime
items
allow_extra_fields
```

Example:

```yaml
message:
  type: object
  required:
    - event_id
    - status
  properties:
    event_id:
      type: string
      nullable: false

    status:
      type: string
      enum:
        - CREATED
        - UPDATED
        - CANCELLED

rules:
  allow_extra_fields: false
```

---

## Result Model

All validators return a result object.

```python
result.passed
result.issues
result.metadata
```

Example:

```python
result = kafka_contract.validate_payload(payload, "contracts/order-created.yaml")

if not result.passed:
    for issue in result.issues:
        print(issue.code, issue.path, issue.message)
```

Issue examples:

```txt
MISSING_REQUIRED_FIELD
TYPE_MISMATCH
CONST_MISMATCH
ENUM_MISMATCH
EXTRA_FIELD
NULL_NOT_ALLOWED
AVRO_RECORD_INVALID
REGISTRY_SUBJECT_NOT_FOUND
REGISTRY_COMPATIBILITY_FAILED
KAFKA_NO_MESSAGE
KAFKA_DESERIALIZATION_FAILED
```

---

## Reports

Markdown and JSON reports can be created from pytest runs:

```bash
pytest \
  --kafka-contract-report .reports/kafka-contract.md \
  --kafka-contract-json-report .reports/kafka-contract.json
```

Markdown report includes:

```txt
summary
passed checks
failed checks
issue codes
topic metadata
schema metadata
```

JSON report includes structured output for CI pipelines, artifacts, and Slack summaries.

---

## Local Kafka Example

Start local Kafka-compatible infrastructure:

```bash
docker compose -f docker-compose.integration.yml up -d
```

Run integration tests:

```bash
PKCT_RUN_INTEGRATION=1 pytest tests/integration -q
```

Stop services:

```bash
docker compose -f docker-compose.integration.yml down -v
```

---

## Development

Create a virtual environment:

```bash
python3 -m venv .venv
source .venv/bin/activate
```

Install in editable mode:

```bash
python -m pip install --upgrade pip
python -m pip install -e ".[dev]"
```

Run tests:

```bash
pytest
```

Run full integration suite:

```bash
docker compose -f docker-compose.integration.yml up -d
PKCT_RUN_INTEGRATION=1 pytest -q
docker compose -f docker-compose.integration.yml down -v
```

Run quality checks:

```bash
ruff check .
mypy src
```

Build package:

```bash
python -m build
python -m twine check dist/*
```

---

## Tested Workflows

The test suite covers:

- Unit validation
- JSON contract validation
- Avro schema validation
- Schema Registry client behavior
- Kafka validation logic
- Pytest plugin discovery
- Pytest fixture usage
- CLI workflows
- README command workflows
- Temporary user project workflow
- Wheel install workflow
- Real Kafka integration tests

Current local validation target:

```txt
111 passed
```

---

## When To Use This

Use this package when you want to test:

- Kafka event payloads
- Redpanda event payloads
- JSON event contracts
- Avro records
- Schema Registry subjects
- Event-driven microservices
- CDC pipeline outputs
- Producer/consumer contract expectations
- QA automation checks in CI

It is especially useful when your team already uses pytest and wants lightweight Kafka contract checks without adopting a larger contract-testing platform immediately.

---

## Limitations

This package is focused on pytest-native Kafka contract testing.

It is not:

- A full replacement for Confluent Schema Registry
- A full Pact Broker replacement
- A Kafka monitoring platform
- A production schema governance system

Recommended use:

```txt
use Schema Registry for schema governance
use pytest-kafka-contract for automated test validation
```

---

## Roadmap

Planned improvements:

- Protobuf support
- Kafka header validation
- Kafka key validation
- JSON Schema export
- Better CI report summaries
- GitHub Actions examples
- More built-in contract examples

---

## License

MIT License

Copyright (c) 2026 Dharsan Guruparan

Permission is hereby granted, free of charge, to any person obtaining a copy
of this software and associated documentation files (the "Software"), to deal
in the Software without restriction, including without limitation the rights
to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
copies of the Software, and to permit persons to whom the Software is
furnished to do so, subject to the following conditions:

The above copyright notice and this permission notice shall be included in all
copies or substantial portions of the Software.

THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
SOFTWARE.

