Metadata-Version: 2.3
Name: kafka_rpc_pubsub
Version: 1.0.1
Summary: Pydantic-typed Kafka pub/sub with RPC semantics — Producer / Consumer roles over kfk_mng
License: MIT
Keywords: kafka,pubsub,rpc,pydantic,async
Author: Aleksandr Yurlov
Author-email: Sasha.yur@mail.ru
Requires-Python: >=3.11,<4.0
Classifier: Development Status :: 4 - Beta
Classifier: Framework :: AsyncIO
Classifier: Framework :: Pydantic :: 2
Classifier: License :: OSI Approved :: MIT License
Classifier: Programming Language :: Python :: 3
Classifier: Programming Language :: Python :: 3.11
Classifier: Programming Language :: Python :: 3.12
Classifier: Programming Language :: Python :: 3.13
Classifier: Topic :: Software Development :: Libraries :: Python Modules
Requires-Dist: aiokafka (>=0.11.0,<0.12.0)
Requires-Dist: kfk_mng (>=1.0)
Requires-Dist: pydantic (>=2.0.0,<3.0.0)
Description-Content-Type: text/markdown

# kafka_rpc_pubsub

Pydantic-typed Kafka pub/sub с RPC-семантикой. Тонкий слой поверх [`kfk_mng`](https://pypi.org/project/kfk_mng/) — даёт типизированный `KafkaProducer[T]` / `KafkaConsumer[T]` для inter-service async events.

## Установка

```bash
pip install kafka_rpc_pubsub
```

## Возможности

- ✅ Generic над Pydantic v2: `KafkaProducer[MyMessage]`, `KafkaConsumer[MyMessage]`
- ✅ ABC + concrete subclasses: Producer (только publish), Consumer (только subscribe)
- ✅ JSON сериализация через `model_dump_json()` / `model_validate_json()`
- ✅ Manual offset commit
- ✅ Consumer groups для параллельной обработки
- ✅ Layered API: `KafkaProtocol → PubSub[T] → BaseKafkaService[T] → Producer/Consumer`

## Быстрый старт

```python
import kfk_mng
from pydantic import BaseModel
from kafka_rpc_pubsub import KafkaProducer, KafkaConsumer

# 1. Регистрация подключения через kfk_mng
kfk_mng.register(
    alias="default",
    bootstrap_servers="kafka:9092",
    enable_producer=True,
    enable_consumer=True,
    consumer_topics=["events"],
    consumer_group_id="my-workers",
)

# 2. Описать сообщение через Pydantic
class EventMessage(BaseModel):
    id: str
    payload: dict


# 3. Создать роль (Producer / Consumer)
class EventProducer(KafkaProducer[EventMessage]):
    model = EventMessage


class EventConsumer(KafkaConsumer[EventMessage]):
    model = EventMessage


# 4. Использовать
producer = EventProducer(topic="events")
partition, offset = await producer.call(EventMessage(id="x", payload={"a": 1}))

consumer = EventConsumer(topic="events", group_id="my-workers")
while True:
    result = await consumer.process()
    if result.message:
        await handle(result.message)
        await consumer.commit(result)
```

## API

### Layered архитектура

```
kfk_mng.KafkaComponents       (raw aiokafka)
    ↓
KafkaProtocol                 (raw send / consume / commit)
    ↓
PubSub[T]                     (Pydantic-typed publish / subscribe / commit)
    ↓
BaseKafkaService[T]           (ABC, integration с alias, group_id, model)
    ↓
KafkaProducer[T] / KafkaConsumer[T]   (роли)
```

### Классы

| Класс | Описание |
|---|---|
| `KafkaProducer[T]` | Только публикация: `await call(message)` → `(partition, offset)` |
| `KafkaConsumer[T]` | Только потребление: `await process()` → `ConsumeResult[T]` |
| `BaseKafkaService[T]` | ABC с `model: type[T]` classvar и `get_key(message)` abstractmethod |
| `PubSub[T]` | Низкоуровневый `publish` / `subscribe` / `commit` |
| `ConsumeResult[T]` | dataclass: `offset` / `partition` / `topic` / `message` |

### Partitioning key

По умолчанию `get_key(message)` возвращает `str(message.id)` или `str(message.oid)`. Override:

```python
class OrderProducer(KafkaProducer[OrderMessage]):
    model = OrderMessage

    def get_key(self, message: OrderMessage) -> str:
        return message.customer_id  # партиционирование по клиенту
```

## Связанные библиотеки

- [`kfk_mng`](https://pypi.org/project/kfk_mng/) — connection manager (обязательная зависимость)
- Inspired by [`redis_rpc_pubsub`](https://github.com/AlexYrlv/redis_rpc_pubsub) (тот же API, но через Redis)

## License

MIT

