Metadata-Version: 2.4
Name: pytest-kafka-contract
Version: 0.2.4
Summary: A pytest plugin and CLI for validating Kafka JSON and Avro messages against contracts.
Project-URL: Homepage, https://github.com/dharzan/pytest-kafka-contract
Project-URL: Repository, https://github.com/dharzan/pytest-kafka-contract
Project-URL: Issues, https://github.com/dharzan/pytest-kafka-contract/issues
Project-URL: Changelog, https://github.com/dharzan/pytest-kafka-contract/blob/main/CHANGELOG.md
Author: Dharsan Guruparan
License-Expression: MIT
License-File: LICENSE
Keywords: avro,contract-testing,event-driven,integration-testing,kafka,pytest,qa,schema-registry,sdet
Classifier: Development Status :: 3 - Alpha
Classifier: Framework :: Pytest
Classifier: Intended Audience :: Developers
Classifier: License :: OSI Approved :: MIT License
Classifier: Operating System :: OS Independent
Classifier: Programming Language :: Python :: 3
Classifier: Programming Language :: Python :: 3.10
Classifier: Programming Language :: Python :: 3.11
Classifier: Programming Language :: Python :: 3.12
Classifier: Programming Language :: Python :: 3.13
Classifier: Topic :: Software Development :: Quality Assurance
Classifier: Topic :: Software Development :: Testing
Requires-Python: >=3.10
Requires-Dist: jsonschema>=4.0.0
Requires-Dist: pydantic>=2.0.0
Requires-Dist: pytest>=8.0.0
Requires-Dist: pyyaml>=6.0.0
Requires-Dist: rich>=13.0.0
Requires-Dist: typer<0.25.1,>=0.12.0
Provides-Extra: all
Requires-Dist: confluent-kafka>=2.3.0; extra == 'all'
Requires-Dist: fastavro>=1.9.0; extra == 'all'
Requires-Dist: httpx>=0.27.0; extra == 'all'
Provides-Extra: avro
Requires-Dist: fastavro>=1.9.0; extra == 'avro'
Provides-Extra: dev
Requires-Dist: build>=1.2.0; extra == 'dev'
Requires-Dist: confluent-kafka>=2.3.0; extra == 'dev'
Requires-Dist: fastavro>=1.9.0; extra == 'dev'
Requires-Dist: httpx>=0.27.0; extra == 'dev'
Requires-Dist: mypy>=1.10.0; extra == 'dev'
Requires-Dist: pydantic>=2.0.0; extra == 'dev'
Requires-Dist: pytest-cov>=5.0.0; extra == 'dev'
Requires-Dist: pytest>=8.0.0; extra == 'dev'
Requires-Dist: ruff>=0.6.0; extra == 'dev'
Requires-Dist: twine>=5.0.0; extra == 'dev'
Requires-Dist: types-pyyaml>=6.0; extra == 'dev'
Provides-Extra: kafka
Requires-Dist: confluent-kafka>=2.3.0; extra == 'kafka'
Provides-Extra: registry
Requires-Dist: httpx>=0.27.0; extra == 'registry'
Description-Content-Type: text/markdown

# pytest-kafka-contract

Pytest plugin and CLI for contract testing and flow testing Kafka event payloads.

`pytest-kafka-contract` removes Kafka test boilerplate and lets teams validate real JSON/Avro Kafka messages, Schema Registry subjects, and black-box event flows directly in pytest.

`pytest-kafka-contract` helps QA and backend teams validate Kafka messages against explicit contracts and verify that events flow correctly through services — all from normal pytest tests.

It supports:

- JSON payload validation against YAML contracts
- Avro record validation against `.avsc` schemas
- Confluent Schema Registry checks
- Real Kafka message validation
- Real Kafka Avro message decoding
- **Kafka black-box flow testing** — produce → consume → assert
- **Negative flow testing** — assert that an event is NOT forwarded
- **YAML-driven flow specs** via the `flow-run` CLI command
- Pytest fixture API
- CLI validation commands
- Markdown and JSON reports

---

## Why

Kafka messages can break silently.

A producer can rename a field, remove a required value, change a number into a string, or publish a payload that no longer matches what downstream consumers expect. A routing service can silently drop events it should forward, or forward events it should filter.

This package catches issues like:

- Missing required fields
- Wrong field types
- Invalid constants
- Invalid enum values
- Unexpected extra fields
- Null values where null is not allowed
- Invalid Avro records
- Schema Registry subject issues
- Kafka messages that do not match their expected contract
- Events that fail to reach a destination topic
- Events that reach a destination topic when they should not

The goal is simple:

> Make Kafka event contracts and flows testable in normal pytest workflows.

---

## Why Not Just Normal Assertions?

Normal assertions are still useful.

This package does not replace pytest assertions. It removes the Kafka boilerplate around them.

Manual Kafka tests usually repeat the same setup:

- create producer
- create consumer
- create unique consumer group
- subscribe to topic
- produce test message
- poll with timeout
- decode JSON or Avro
- match correlation ID
- validate schema or contract
- format readable failures

`pytest-kafka-contract` standardizes that part.

Then you can still write normal pytest assertions for business-specific logic.

Use the package for:

- Kafka producing
- Kafka consuming
- polling
- timeout handling
- correlation matching
- JSON contract validation
- Avro validation
- Schema Registry checks
- readable result objects

Use normal pytest assertions for:

- business math
- field relationships
- custom transformation rules
- enrichment logic
- domain-specific expectations

## Package Levels

You can use this package at several levels:

```txt
Level 1: validate local JSON file
Level 2: validate local Avro record
Level 3: check Schema Registry subject
Level 4: consume latest Kafka message and validate
Level 5: produce source message, consume destination message, validate full flow
Level 6: run custom pytest assertions on consumed event
```

---

## Contract Testing vs Flow Testing

| | Contract test | Flow test |
|---|---|---|
| **Question** | Does this event look right? | Does this app move/transform/filter events correctly? |
| **How** | Validate payload shape/values | Produce to source topic, assert what arrives on destination topic |
| **When** | On any payload, offline | Requires running Kafka and a live app |
| **Fixture method** | `validate_payload`, `validate_avro_record` | `run_json_flow`, `expect_no_json_flow`, `run_avro_flow` |

---

## Install

Basic JSON contract validation (no Kafka required):

```bash
pip install pytest-kafka-contract
```

Install everything, including Kafka, Avro, Schema Registry, and flow testing support:

```bash
pip install "pytest-kafka-contract[all]"
```

Optional extras:

```bash
pip install "pytest-kafka-contract[kafka]"     # real Kafka validation + flow testing
pip install "pytest-kafka-contract[avro]"      # Avro schema validation
pip install "pytest-kafka-contract[registry]"  # Schema Registry checks
```

---

## Quick Start: JSON Contract Validation

Create a contract file:

```bash
mkdir -p contracts
```

Create `contracts/order-created.yaml`:

```yaml
version: 1
name: order-created-v1
topic: orders.created

message:
  type: object
  required:
    - event_id
    - event_type
    - order
  properties:
    event_id:
      type: string
      nullable: false

    event_type:
      type: string
      const: OrderCreated
      nullable: false

    order:
      type: object
      required:
        - order_id
        - total
      properties:
        order_id:
          type: string

        total:
          type: number

rules:
  allow_extra_fields: false
```

Write a pytest test:

```python
def test_order_created_contract(kafka_contract):
    result = kafka_contract.validate_payload(
        payload={
            "event_id": "evt_1",
            "event_type": "OrderCreated",
            "order": {
                "order_id": "ord_1",
                "total": 20.0,
            },
        },
        contract_path="contracts/order-created.yaml",
    )

    assert result.passed, result.issues
```

Run:

```bash
pytest
```

---

## JSON Contract Failure Example

If the payload is wrong:

```python
def test_order_created_contract_fails(kafka_contract):
    result = kafka_contract.validate_payload(
        payload={
            "event_id": "evt_1",
            "event_type": "WrongEvent",
            "order": {
                "order_id": "ord_1",
                "total": "20.0",
            },
        },
        contract_path="contracts/order-created.yaml",
    )

    assert result.passed, result.issues
```

The result contains readable issues such as:

```txt
CONST_MISMATCH
TYPE_MISMATCH
```

---

## Avro Validation

Install Avro support:

```bash
pip install "pytest-kafka-contract[avro]"
```

Create `schemas/order-created.avsc`:

```json
{
  "type": "record",
  "name": "OrderCreated",
  "namespace": "com.example.orders",
  "fields": [
    {
      "name": "event_id",
      "type": "string"
    },
    {
      "name": "order_id",
      "type": "string"
    },
    {
      "name": "total",
      "type": "double"
    },
    {
      "name": "currency",
      "type": {
        "type": "enum",
        "name": "Currency",
        "symbols": ["USD", "CAD", "EUR"]
      },
      "default": "USD"
    }
  ]
}
```

Validate a Python dictionary against the Avro schema:

```python
def test_order_created_avro(kafka_contract):
    result = kafka_contract.validate_avro_record(
        record={
            "event_id": "evt_1",
            "order_id": "ord_1",
            "total": 20.0,
            "currency": "USD",
        },
        schema_path="schemas/order-created.avsc",
    )

    assert result.passed, result.issues
```

---

## Schema Registry Checks

Install Schema Registry support:

```bash
pip install "pytest-kafka-contract[registry]"
```

Validate that a subject exists and is compatible with a local schema:

```python
def test_schema_registry_subject(kafka_contract):
    result = kafka_contract.validate_schema_registry_subject(
        registry_url="http://localhost:8081",
        subject="orders.created-value",
        schema_path="schemas/order-created.avsc",
        compatibility="BACKWARD",
    )

    assert result.passed, result.issues
```

This can check:

- Registry reachability
- Subject existence
- Latest schema lookup
- Local schema comparison
- Compatibility result

### Schema Registry vs this package

Schema Registry governs schemas. `pytest-kafka-contract` validates real app behavior in tests.

For example, Schema Registry can say `OrderCreated` is compatible. This package can prove checkout actually emitted `OrderCreated` with the expected `order_id`, `total`, Kafka topic, wire format, and downstream flow behavior.

---

## Real Kafka JSON Message Validation

Install Kafka support:

```bash
pip install "pytest-kafka-contract[kafka]"
```

Validate the latest JSON message from a Kafka topic:

```python
def test_latest_json_message(kafka_contract):
    result = kafka_contract.validate_latest_json(
        topic="orders.created",
        contract_path="contracts/order-created.yaml",
        bootstrap_servers="localhost:9092",
        timeout_ms=10000,
        auto_offset_reset="latest",  # "latest" or "earliest"
    )

    assert result.passed, result.issues
```

This flow:

```txt
consume Kafka message
decode JSON
validate against YAML contract
return detailed result
```

---

## Real Kafka Avro Message Validation

Install all optional dependencies:

```bash
pip install "pytest-kafka-contract[all]"
```

Validate the latest Avro message from Kafka using Schema Registry:

```python
def test_latest_avro_message(kafka_contract):
    result = kafka_contract.validate_latest_avro(
        topic="orders.created",
        registry_url="http://localhost:8081",
        subject="orders.created-value",
        bootstrap_servers="localhost:9092",
        timeout_ms=10000,
        auto_offset_reset="latest",  # "latest" or "earliest"
    )

    assert result.passed, result.issues
```

This flow:

```txt
consume Kafka message
extract Confluent schema ID
fetch schema from Schema Registry
decode Avro payload
validate decoded record
return detailed result
```

---

## Kafka Flow Testing

Flow testing lets you verify that your service correctly moves, transforms, or filters Kafka events — without touching its internals.

Install Kafka support:

```bash
pip install "pytest-kafka-contract[kafka]"
```

`flow-run` runs a full Kafka black-box flow from YAML:

```bash
kafka-contract flow-run flows/paid-order-flow.yaml
```

It does this in order:

```txt
1. read flow YAML
2. subscribe to destination topic first
3. produce payload to source topic
4. wait for the real app/service to process the event
5. consume destination topic
6. match the correct message by correlation_id
7. validate expected fields
8. optionally validate a JSON contract or Avro schema
9. print pass/fail
10. exit 0 on pass, exit 1 on fail
```

`flow-run` does not start your app. Your service must already be running and connected to Kafka.

### Positive flow: assert an event reaches the destination

```python
def test_paid_order_reaches_destination(kafka_contract):
    cid = kafka_contract.new_correlation_id()

    result = kafka_contract.run_json_flow(
        source_topic="src.rds.orders",
        destination_topic="dest.filtered.orders",
        payload={
            "correlation_id": cid,
            "order_id": "ord_123",
            "status": "PAID",
            "amount": 49.99,
        },
        expect={
            "correlation_id": cid,
            "status": "PAID",
        },
        contract_path="contracts/paid-order.yaml",
        bootstrap_servers="localhost:9092",
        timeout_ms=10000,
    )

    assert result.passed, result.issues
```

`run_json_flow` does the following in order:

```txt
1. subscribe to destination_topic
2. produce payload to source_topic (with injected correlation_id)
3. poll destination_topic until the correlated message arrives
4. assert expected subset fields match (if expect= given)
5. validate against contract (if contract_path= given)
6. return FlowResult
```

Subscribing before producing prevents missing fast messages.

### Negative flow: assert an event does NOT reach the destination

```python
def test_cancelled_order_is_filtered_out(kafka_contract):
    cid = kafka_contract.new_correlation_id()

    result = kafka_contract.expect_no_json_flow(
        source_topic="src.rds.orders",
        destination_topic="dest.filtered.orders",
        payload={
            "correlation_id": cid,
            "order_id": "ord_123",
            "status": "CANCELLED",
        },
        bootstrap_servers="localhost:9092",
        timeout_ms=5000,
    )

    assert result.passed, result.issues
```

`expect_no_json_flow` passes when no correlated message arrives on the destination topic within `timeout_ms`.

### CLI Flow Assertions

The CLI supports simple expected-field assertions through `expect`.

```yaml
name: paid-order-reaches-destination
format: json

bootstrap_servers: localhost:9092
timeout_ms: 10000

source_topic: src.rds.orders
destination_topic: dest.filtered.orders
correlation_field: correlation_id

payload:
  order_id: ord_123
  status: PAID
  amount: 49.99

expect:
  order_id: ord_123
  status: PAID
  amount: 49.99

contract_path: contracts/paid-order.yaml
```

Run:

```bash
kafka-contract flow-run flows/paid-order.yaml
```

Use CLI flow specs for simple checks. Use pytest flow helpers when you need custom Python assertions.

Avro flow spec:

```yaml
name: avro-paid-order-flow
format: avro
bootstrap_servers: localhost:9092
schema_registry_url: http://localhost:8081
timeout_ms: 10000

source_topic: src.rds.orders
destination_topic: dest.filtered.orders
source_subject: src.rds.orders-value
destination_subject: dest.filtered.orders-value
correlation_field: correlation_id

payload:
  correlation_id: test-123
  order_id: ord_123
  status: PAID

expect:
  correlation_id: test-123
  status: PAID
```

Run it the same way:

```bash
kafka-contract flow-run flows/avro-paid-order-flow.yaml
```

Negative flow spec using `mode: expect_no_message`:

```yaml
# examples/flows/cancelled-order-filtered-out.yaml
name: cancelled-order-filtered-out
format: json
mode: expect_no_message
bootstrap_servers: localhost:9092
timeout_ms: 10000

source_topic: src.rds.orders
destination_topic: dest.filtered.orders
correlation_field: correlation_id

payload:
  order_id: ord_456
  status: CANCELLED
  amount: 19.99
```

### Correlation ID

By default, a `uuid4` is generated and injected into the payload under `correlation_field` (default: `correlation_id`). The consumer polls for the message whose value contains the matching correlation ID.

You can supply a fixed ID:

```python
result = kafka_contract.run_json_flow(
    ...,
    correlation_id="my-fixed-id",
)
```

If the payload already contains `correlation_field`, it is not overwritten.

---

## Custom Assertions After a Flow

The flow helpers return a result object.

You can first let the package validate the Kafka flow, then write your own pytest assertions against the consumed message.

```python
def test_paid_order_flow_has_correct_business_logic(kafka_contract):
    result = kafka_contract.run_json_flow(
        source_topic="src.rds.orders",
        destination_topic="dest.filtered.orders",
        payload={
            "order_id": "ord_123",
            "status": "PAID",
            "subtotal": 100.00,
            "tax": 8.25,
        },
        expect={
            "order_id": "ord_123",
            "status": "PAID",
        },
        contract_path="contracts/filtered-order.yaml",
        bootstrap_servers="localhost:9092",
        timeout_ms=30000,
    )

    # Package-level checks
    assert result.passed, result.issues
    assert result.consumed_message is not None

    # User-level custom business checks
    event = result.consumed_message.value
    assert event is not None

    assert event["total"] == 108.25
    assert event["total"] == event["subtotal"] + event["tax"]
    assert event["status"] == "PAID"
    assert event["processed_by"] == "order-filter-service"
```

Recommended pattern:

```txt
Package proves Kafka flow worked.
Your pytest assertions prove business logic is correct.
```

## What Custom Assertions Are Good For

Use custom assertions when contract shape is not enough.

```python
# Field relationship
assert event["total"] == event["subtotal"] + event["tax"]

# Business rule
assert event["status"] in ["PAID", "SETTLED"]

# Enrichment check
assert event["processed_by"] == "order-filter-service"

# Value copied from input
assert event["order_id"] == input_order_id

# Timestamp generated
assert event["processed_at"] is not None

# Domain rule
assert event["discount"] <= event["subtotal"]
```

## CLI Flow vs Pytest Flow

| Need | Use |
|---|---|
| Quick contract check from terminal | CLI |
| CI smoke check | CLI |
| YAML-defined flow test | CLI |
| Simple expected fields | CLI `expect` |
| Business math assertions | Pytest |
| Field relationship checks | Pytest |
| Reusing test fixtures/data setup | Pytest |
| Complex app-specific assertions | Pytest |

Rule of thumb:

```txt
CLI = simple portable flow check
pytest = full test logic with custom assertions
```

## Flow Result Object

Flow helpers return a result object.

Useful fields:

```python
result.passed
result.issues
result.correlation_id
result.produced_message
result.consumed_message
result.metadata
```

Example:

```python
result = kafka_contract.run_json_flow(...)

assert result.passed, result.issues
assert result.consumed_message is not None

event = result.consumed_message.value
assert event is not None
assert event["status"] == "PAID"
```

## Example: Test a Kafka Filter Service

Suppose your service does this:

```txt
Consumes: src.rds.orders
Publishes: dest.filtered.orders
Rule: only PAID orders should be forwarded
Rule: CANCELLED orders should be dropped
```

Positive flow:

```python
def test_paid_order_reaches_destination(kafka_contract):
    result = kafka_contract.run_json_flow(
        source_topic="src.rds.orders",
        destination_topic="dest.filtered.orders",
        payload={
            "order_id": "ord_paid_1",
            "status": "PAID",
            "amount": 49.99,
        },
        expect={
            "order_id": "ord_paid_1",
            "status": "PAID",
        },
        contract_path="contracts/paid-order.yaml",
        bootstrap_servers="localhost:9092",
    )

    assert result.passed, result.issues
```

Negative flow:

```python
def test_cancelled_order_is_not_forwarded(kafka_contract):
    cid = kafka_contract.new_correlation_id()

    result = kafka_contract.expect_no_json_flow(
        source_topic="src.rds.orders",
        destination_topic="dest.filtered.orders",
        payload={
            "correlation_id": cid,
            "order_id": "ord_cancelled_1",
            "status": "CANCELLED",
            "amount": 49.99,
        },
        correlation_id=cid,
        bootstrap_servers="localhost:9092",
        timeout_ms=5000,
    )

    assert result.passed, result.issues
```

## Example: Avro Flow With Schema Registry

Use this when your Kafka messages use Confluent Avro wire format.

```python
def test_paid_order_avro_flow(kafka_contract):
    result = kafka_contract.run_avro_flow(
        source_topic="src.rds.orders",
        destination_topic="dest.filtered.orders",
        payload={
            "order_id": "ord_123",
            "status": "PAID",
            "amount": 49.99,
        },
        expect={
            "order_id": "ord_123",
            "status": "PAID",
        },
        registry_url="http://localhost:8081",
        source_subject="src.rds.orders-value",
        destination_subject="dest.filtered.orders-value",
        bootstrap_servers="localhost:9092",
        timeout_ms=30000,
    )

    assert result.passed, result.issues
    assert result.consumed_message is not None

    event = result.consumed_message.value
    assert event is not None
    assert event["amount"] > 0
```

---

## Pytest Fixture API

The plugin provides a `kafka_contract` fixture.

```python
def test_with_fixture(kafka_contract):
    result = kafka_contract.validate_payload(
        payload={"event_type": "OrderCreated"},
        contract_path="contracts/order-created.yaml",
    )

    assert result.passed, result.issues
```

Supported fixture methods:

```python
# Contract validation (no Kafka required)
kafka_contract.new_correlation_id()

kafka_contract.load_contract(contract_path)

kafka_contract.validate_payload(
    payload,
    contract_path,
)

kafka_contract.validate_avro_record(
    record,
    schema_path,
)

kafka_contract.validate_schema_registry_subject(
    registry_url,
    subject,
    schema_path,
    compatibility=None,
)

# Real Kafka – consume latest message and validate
kafka_contract.validate_latest_json(
    topic,
    contract_path,
    bootstrap_servers="localhost:9092",
    timeout_ms=10000,
    auto_offset_reset="latest",
)

kafka_contract.validate_latest_avro(
    topic,
    registry_url,
    subject,
    bootstrap_servers="localhost:9092",
    timeout_ms=10000,
    auto_offset_reset="latest",
)

# Flow testing – produce to source, assert on destination
kafka_contract.run_json_flow(
    source_topic,
    destination_topic,
    payload,
    expect=None,           # optional: subset of destination message to assert
    contract_path=None,    # optional: YAML contract to validate destination message
    correlation_field="correlation_id",
    correlation_id=None,   # auto-generated as uuid4 if not given
    bootstrap_servers=None,
    timeout_ms=None,
)

kafka_contract.expect_no_json_flow(
    source_topic,
    destination_topic,
    payload,
    correlation_field="correlation_id",
    correlation_id=None,
    bootstrap_servers=None,
    timeout_ms=None,
)

kafka_contract.run_avro_flow(
    source_topic,
    destination_topic,
    payload,
    registry_url,
    source_subject,
    destination_subject,
    expect=None,
    expected_destination_schema_id=None,
    bootstrap_servers=None,
    timeout_ms=None,
)
```

---

## CLI Usage

Initialize example contracts and samples:

```bash
kafka-contract init
```

Validate a JSON sample file against a YAML contract:

```bash
kafka-contract validate-file \
  contracts/order-created.yaml \
  samples/order-created.json
```

Validate a JSON sample file against an Avro schema:

```bash
kafka-contract avro-validate-file \
  schemas/order-created.avsc \
  samples/order-created-avro.json
```

Check Schema Registry:

```bash
kafka-contract registry-check \
  --registry-url http://localhost:8081 \
  --subject orders.created-value \
  --schema schemas/order-created.avsc \
  --compatibility BACKWARD
```

Validate the latest JSON message from Kafka:

```bash
kafka-contract kafka-validate-json \
  --bootstrap-servers localhost:9092 \
  --topic orders.created \
  --contract contracts/order-created.yaml \
  --timeout-ms 10000
```

Validate the latest Avro message from Kafka:

```bash
kafka-contract kafka-validate-avro \
  --bootstrap-servers localhost:9092 \
  --registry-url http://localhost:8081 \
  --topic orders.created \
  --subject orders.created-value \
  --timeout-ms 10000
```

Run a YAML-defined flow test:

```bash
kafka-contract flow-run examples/flows/paid-order-flow.yaml
```

This produces output like:

```txt
PASS  paid-order-reaches-destination
      source:      src.rds.orders
      destination: dest.filtered.orders
      correlation: a1b2c3d4-...
```

Or on failure:

```txt
FAIL  cancelled-order-filtered-out
      FLOW_UNEXPECTED_MESSAGE at $: Expected no destination message but found one...
```

Show help:

```bash
kafka-contract --help
```

---

## Pytest CLI Options

```bash
pytest \
  --kafka-contract contracts/order-created.yaml \
  --kafka-bootstrap-servers localhost:9092 \
  --kafka-timeout-ms 10000
```

Report options:

```bash
pytest \
  --kafka-contract-report .reports/kafka-contract.md \
  --kafka-contract-json-report .reports/kafka-contract.json
```

Available options:

```txt
--kafka-contract
--kafka-bootstrap-servers
--kafka-timeout-ms
--kafka-strict
--kafka-contract-report
--kafka-contract-json-report
--schema-registry-url
--kafka-avro-subject
--kafka-format
```

---

## Contract Rules

Supported JSON contract types:

```txt
string
number
integer
boolean
object
array
null
```

Supported validation rules:

```txt
required
properties
nullable
const
enum
format: datetime
items
allow_extra_fields
```

Example:

```yaml
message:
  type: object
  required:
    - event_id
    - status
  properties:
    event_id:
      type: string
      nullable: false

    status:
      type: string
      enum:
        - CREATED
        - UPDATED
        - CANCELLED

rules:
  allow_extra_fields: false
```

---

## Result Model

Contract validators return a `ContractResult`:

```python
result.passed      # bool
result.issues      # list of ContractIssue
result.metadata    # dict – topic, offset, schema_id, etc.
```

Flow methods return a `FlowResult`:

```python
result.passed              # bool
result.source_topic        # str
result.destination_topic   # str
result.correlation_id      # str
result.produced_message    # FlowMessage | None
result.consumed_message    # FlowMessage | None  (None on FLOW_TIMEOUT)
result.issues              # list of ContractIssue
result.metadata            # dict – partition, offset
```

Inspecting issues:

```python
for issue in result.issues:
    print(issue.code, issue.path, issue.message)
```

Contract issue codes:

```txt
MISSING_REQUIRED_FIELD
TYPE_MISMATCH
CONST_MISMATCH
ENUM_MISMATCH
EXTRA_FIELD
NULL_NOT_ALLOWED
AVRO_RECORD_INVALID
REGISTRY_SUBJECT_NOT_FOUND
REGISTRY_COMPATIBILITY_FAILED
KAFKA_NO_MESSAGE
KAFKA_DESERIALIZATION_FAILED
```

Flow issue codes:

```txt
FLOW_TIMEOUT               – no matching message arrived within timeout_ms
FLOW_EXPECTATION_FAILED    – destination message did not match expected subset
FLOW_CONTRACT_FAILED       – destination message failed contract validation
FLOW_UNEXPECTED_MESSAGE    – message arrived when expect_no_json_flow expected silence
FLOW_DESERIALIZATION_FAILED – destination message could not be decoded
FLOW_SCHEMA_ID_MISMATCH    – Avro destination schema_id did not match expected
```

---

## Reports

Markdown and JSON reports can be created from pytest runs:

```bash
pytest \
  --kafka-contract-report .reports/kafka-contract.md \
  --kafka-contract-json-report .reports/kafka-contract.json
```

Markdown report includes:

```txt
summary
passed checks
failed checks
issue codes
topic metadata
schema metadata
```

JSON report includes structured output for CI pipelines, artifacts, and Slack summaries.

---

## Local Kafka Example

Start local Kafka-compatible infrastructure:

```bash
docker compose -f docker-compose.integration.yml up -d
```

Run integration tests:

```bash
PKCT_RUN_INTEGRATION=1 pytest tests/integration -q
```

Stop services:

```bash
docker compose -f docker-compose.integration.yml down -v
```

---

## What Can This Test?

This package is generalized for Kafka event testing.

Use it for:

- producer event contract tests
- consumer output tests
- Kafka filter services
- CDC pipeline validation
- Debezium output checks
- enrichment pipeline tests
- event transformation tests
- Avro message validation
- Schema Registry subject checks
- CI smoke tests against Kafka topics
- black-box integration tests

It does not care what app framework you use.

Your app can be:

- Java Spring Boot
- Node.js
- Python
- Go
- .NET
- anything that reads/writes Kafka

The package only cares about Kafka inputs, Kafka outputs, contracts, and test results.

It is especially useful when your team already uses pytest and wants lightweight Kafka contract checks without adopting a larger contract-testing platform immediately.

---

## Limitations

This package is focused on pytest-native Kafka contract testing.

It is not:

- A full replacement for Confluent Schema Registry
- A full Pact Broker replacement
- A Kafka monitoring platform
- A production schema governance system

Recommended use:

```txt
use Schema Registry for schema governance
use pytest-kafka-contract for automated test validation
```

---

## License

MIT License

Copyright (c) 2026 Dharsan Guruparan

Permission is hereby granted, free of charge, to any person obtaining a copy
of this software and associated documentation files (the "Software"), to deal
in the Software without restriction, including without limitation the rights
to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
copies of the Software, and to permit persons to whom the Software is
furnished to do so, subject to the following conditions:

The above copyright notice and this permission notice shall be included in all
copies or substantial portions of the Software.

THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
SOFTWARE.
