Metadata-Version: 2.4
Name: pronto-kafka
Version: 0.0.1
Summary: Async Kafka producer/consumer helpers for FastAPI projects
Author-email: Til <info@justtil.dev>
License: Copyright (c) 2026 Til Schwarze
        Permission is hereby granted, free of charge, to any person obtaining a copy of this software and associated documentation files (the “Software”), to deal in the Software without restriction, including without limitation the rights to use, copy, modify, merge, publish, distribute, sublicense, and/or sell copies of the Software, and to permit persons to whom the Software is furnished to do so, subject to the following conditions:
        
        The above copyright notice and this permission notice shall be included in all copies or substantial portions of the Software.
        
        THE SOFTWARE IS PROVIDED “AS IS”, WITHOUT WARRANTY OF ANY KIND, EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
License-File: LICENSE
Keywords: aiokafka,async,consumer,fastapi,kafka,producer
Requires-Python: >=3.10
Requires-Dist: aiokafka
Provides-Extra: dev
Requires-Dist: pytest; extra == 'dev'
Requires-Dist: pytest-asyncio; extra == 'dev'
Requires-Dist: ruff; extra == 'dev'
Description-Content-Type: text/markdown

# pronto-kafka

Async Kafka producer/consumer helpers for FastAPI (and any async Python) projects. Wraps [aiokafka](https://aiokafka.readthedocs.io/) with a module-level producer, context-manager consumers, and ready-made FastAPI lifespan hooks.

## Installation

```bash
pip install pronto-kafka
```

## Quick start — FastAPI producer

```python
from fastapi import FastAPI
from pronto_kafka.v1 import make_producer_lifespan, send

app = FastAPI(lifespan=make_producer_lifespan("KAFKA_BOOTSTRAP_SERVERS"))

@app.post("/orders")
async def create_order(order: dict):
    import json
    await send("orders", json.dumps(order).encode())
    return {"status": "queued"}
```

Set your bootstrap servers in the environment:

```
KAFKA_BOOTSTRAP_SERVERS=localhost:9092
```

Pass any env var name to `make_producer_lifespan` — `MY_KAFKA_SERVERS`, whatever fits your project.

## Quick start — consumer

```python
from pronto_kafka.v1 import create_consumer_from_env

async def process_orders():
    async with create_consumer_from_env("orders", group_id="order-svc") as consumer:
        async for msg in consumer:
            print(msg.topic, msg.value)
```

## Quick start — consumer as FastAPI background task

```python
from fastapi import FastAPI
from pronto_kafka.v1 import make_consumer_lifespan

async def handle(msg):
    print(f"received: {msg.value}")

app = FastAPI(
    lifespan=make_consumer_lifespan(
        "orders",
        group_id="order-svc",
        handler=handle,
    )
)
```

## Producer API

### `make_producer_lifespan(env_var="KAFKA_BOOTSTRAP_SERVERS", **kwargs)`

Returns a FastAPI-compatible lifespan that starts the producer on startup and stops it on shutdown. Extra `kwargs` are forwarded to `AIOKafkaProducer`.

### `init_producer(bootstrap_servers, **kwargs)`

Start the module-level producer with an explicit bootstrap-servers string.

### `init_producer_from_env(env_var="KAFKA_BOOTSTRAP_SERVERS", **kwargs)`

Start the producer by reading bootstrap servers from an environment variable. Raises `RuntimeError` if the variable is missing or empty.

### `close_producer()`

Stop and tear down the module-level producer.

### `get_producer() -> AIOKafkaProducer`

Return the active producer instance. Raises `RuntimeError` if not initialised.

### `send(topic, value=None, key=None, headers=None, partition=None, **kwargs) -> RecordMetadata`

Convenience wrapper around `producer.send_and_wait(...)`. Raises `RuntimeError` if the producer is not initialised.

```python
meta = await send("events", b'{"type": "login"}', key=b"user-42")
print(meta.topic, meta.partition, meta.offset)
```

## Consumer API

### `create_consumer(*topics, group_id, bootstrap_servers, **kwargs)`

Async context manager that yields a started `AIOKafkaConsumer`. Stops the consumer on exit.

```python
async with create_consumer("orders", group_id="svc", bootstrap_servers="localhost:9092") as consumer:
    async for msg in consumer:
        process(msg)
```

### `create_consumer_from_env(*topics, group_id, env_var="KAFKA_BOOTSTRAP_SERVERS", **kwargs)`

Same as `create_consumer` but reads bootstrap servers from an environment variable.

### `make_consumer_lifespan(*topics, group_id, env_var="KAFKA_BOOTSTRAP_SERVERS", handler, **kwargs)`

Returns a FastAPI-compatible lifespan that runs a consumer loop as an `asyncio` background task. Supply a `handler` async function that receives each `ConsumerRecord`.

```python
async def handle(msg):
    await process(msg.value)

app = FastAPI(lifespan=make_consumer_lifespan("orders", group_id="svc", handler=handle))
```

## Manual lifespan (without helpers)

```python
from contextlib import asynccontextmanager
from fastapi import FastAPI
from pronto_kafka.v1 import init_producer_from_env, close_producer

@asynccontextmanager
async def lifespan(app):
    await init_producer_from_env("KAFKA_BOOTSTRAP_SERVERS")
    yield
    await close_producer()

app = FastAPI(lifespan=lifespan)
```

## Development

```bash
pip install -e ".[dev]"
pytest
```

Integration tests run automatically when `KAFKA_BOOTSTRAP_SERVERS` is set (e.g. a local Docker Kafka), otherwise they are skipped.

## Versioning

Versions are derived from git tags via [hatch-vcs](https://github.com/ofek/hatch-vcs).

```bash
git tag v0.1.0
hatch build
```
