Metadata-Version: 2.4
Name: rise-broker
Version: 0.1.0
Summary: Re-usable Kafka broker wrapper for microservices
Classifier: Programming Language :: Python :: 3
Classifier: Operating System :: OS Independent
Requires-Python: >=3.11
Description-Content-Type: text/markdown
Requires-Dist: aiokafka>=0.8.1
Requires-Dist: pydantic>=2.0.0
Provides-Extra: dev
Requires-Dist: pytest>=8.0; extra == "dev"
Requires-Dist: mypy>=1.8; extra == "dev"
Requires-Dist: ruff>=0.3; extra == "dev"

# broker

Reusable Kafka broker wrapper for Python microservices. Built on `aiokafka` and `pydantic`.

## Installation

```bash
pip install broker
```

## Quick Start

### Consumer

```python
import asyncio
from pydantic import BaseModel
from kafka.consumers.base_consumer import BaseConsumer

class OrderEvent(BaseModel):
    order_id: int
    amount: float

class OrderConsumer(BaseConsumer[OrderEvent]):
    topic = "orders"
    group_id = "orders-group"
    schema_class = OrderEvent

    async def process_message(self, event: OrderEvent):
        print(f"Processing order {event.order_id}")

async def main():
    consumer = OrderConsumer("localhost:9092")
    async with consumer:
        await asyncio.sleep(60)
```

### Producer

```python
from pydantic import BaseModel
from kafka.producers.base_producer import BaseProducer
from kafka.client.kafka_client import KafkaClient

class OrderEvent(BaseModel):
    order_id: int
    amount: float

class OrderProducer(BaseProducer):
    topic = "orders"

async def main():
    client = KafkaClient("localhost:9092")
    await client.start()

    producer = OrderProducer(client)
    await producer.publish(OrderEvent(order_id=1, amount=99.99))
```

## Features

### Retries & Error Policy

```python
class OrderConsumer(BaseConsumer[OrderEvent]):
    topic = "orders"
    group_id = "orders-group"
    schema_class = OrderEvent
    max_processing_retries = 3
    retry_backoff = 1.0       # 1s, 2s, 3s
    on_error = ErrorPolicy.SKIP  # or FAIL (default)
```

### Dead Letter Queue

```python
class OrderConsumer(BaseConsumer[OrderEvent]):
    def __init__(self):
        super().__init__(
            "localhost:9092",
            dlq_handler=self._on_dlq,
        )

    async def _on_dlq(self, value: bytes, error: Exception):
        await my_dlq_producer.send(value)
```

### Schema Versioning

```python
class OrderConsumer(BaseConsumer[OrderEvent]):
    schema_version = 2
    migrations = {
        1: lambda d: {**d, "currency": "USD"},
    }
```

Messages are serialized as `{"_schema_version": N, "data": {...}}`. On deserialization, migrations are applied automatically.

### Reconnection

```python
class OrderConsumer(BaseConsumer[OrderEvent]):
    max_reconnect_attempts = 10   # 0 = infinite (default)
    reconnect_backoff = 1.0       # exponential: 1s, 2s, 4s, 8s...
```

### Deduplication

```python
class OrderConsumer(BaseConsumer[OrderEvent]):
    def __init__(self):
        super().__init__(
            "localhost:9092",
            deduplicator=self._is_duplicate,
        )

    async def _is_duplicate(self, key: bytes | None, value: bytes | None) -> bool:
        return key in self._seen
```

### Middleware

```python
from kafka.middleware import ConsumerMiddleware

class MetricsMiddleware(ConsumerMiddleware[OrderEvent]):
    async def before_consume(self, event: OrderEvent) -> OrderEvent:
        self._start = time.monotonic()
        return event

    async def after_consume(self, event: OrderEvent):
        duration = time.monotonic() - self._start
        metrics.histogram("process_duration", duration)

    async def on_consume_error(self, event: OrderEvent, error: Exception, value: bytes) -> bool:
        metrics.counter("process_errors").inc()
        return False  # fall through to error policy

class OrderConsumer(BaseConsumer[OrderEvent]):
    def __init__(self):
        super().__init__(
            "localhost:9092",
            middleware=[MetricsMiddleware()],
        )
```

### Health Check

```python
consumer = OrderConsumer("localhost:9092")
status: HealthStatus = consumer.health()
# status.alive, status.messages_consumed
```

### Multi-Topic Consumer

```python
class MultiTopicConsumer(BaseConsumer[OrderEvent]):
    topic = ["orders", "order-events"]
    group_id = "orders-group"
    schema_class = OrderEvent

    async def process_message(self, event: OrderEvent):
        ...
```

## Lifecycle

All main classes (`KafkaClient`, `BaseConsumer`, `BaseProducer`) implement `ClientLifecycle`:

```python
# Option 1: async with
async with consumer:
    ...

# Option 2: manual
await consumer.start()
try:
    ...
finally:
    await consumer.stop()
```

`BaseConsumer.start()` is a long-running coroutine. The `async with` block runs it in a background task.

## API

### `BaseConsumer[T]`

| Attribute | Default | Description |
|---|---|---|
| `topic` | — | Topic(s) to subscribe to (`str \| list[str]`) |
| `group_id` | — | Consumer group ID |
| `schema_class` | — | `pydantic.BaseModel` subclass for deserialization |
| `enable_auto_commit` | `False` | Kafka auto-commit |
| `max_processing_retries` | `3` | Retries per message |
| `retry_backoff` | `1.0` | Seconds between retries (× attempt number) |
| `process_timeout` | `30.0` | Timeout per `process_message` (`None` = no timeout) |
| `connection_timeout` | `30.0` | Connect/stop timeout |
| `poll_timeout` | `1.0` | Poll wait time |
| `commit_interval_messages` | `100` | Commit batch size |
| `commit_interval_seconds` | `5.0` | Commit batch interval |
| `on_error` | `FAIL` | `FAIL`, `SKIP` |
| `schema_version` | `1` | Schema version for envelope |
| `migrations` | `{}` | Schema migration functions |
| `max_reconnect_attempts` | `0` | Max reconnects (0 = infinite) |
| `reconnect_backoff` | `1.0` | Base reconnect backoff |

### `BaseProducer`

| Attribute | Default | Description |
|---|---|---|
| `topic` | — | Topic to produce to |
| `publish_timeout` | `10.0` | Timeout for `send_and_wait` (`None` = no timeout) |
| `schema_version` | `1` | Schema version in envelope |

### `KafkaClient`

```python
KafkaClient(bootstrap_servers: str, **producer_options)
```

Manages a single `AIOKafkaProducer` instance. Used by `BaseProducer`.

## Exceptions

```
BrokerError
├── ConsumerError
├── ConnectionError
├── ConfigurationError
├── ProcessingError
└── SerializationError
```

## Development

```bash
pip install -e ".[dev]"
pytest tests/
ruff check .
mypy kafka/
```
