Metadata-Version: 2.4
Name: aio-pika-batch
Version: 0.1.0
Summary: Batch consumer for aio-pika with per-message acknowledgment, retry tracking, dead-letter routing, and worker scaling.
Project-URL: Homepage, https://github.com/g-mill/aio-pika-batch
Project-URL: Documentation, https://github.com/g-mill/aio-pika-batch#readme
Project-URL: Repository, https://github.com/g-mill/aio-pika-batch
Project-URL: Issues, https://github.com/g-mill/aio-pika-batch/issues
Author: Griffin Miller, Cosmos Entity Inc.
License-Expression: MIT
License-File: LICENSE
Keywords: aio-pika,amqp,async,batch,consumer,message-queue,rabbitmq
Classifier: Development Status :: 4 - Beta
Classifier: Framework :: AsyncIO
Classifier: Intended Audience :: Developers
Classifier: License :: OSI Approved :: MIT License
Classifier: Programming Language :: Python :: 3
Classifier: Programming Language :: Python :: 3.12
Classifier: Programming Language :: Python :: 3.13
Classifier: Topic :: Software Development :: Libraries :: Python Modules
Classifier: Topic :: System :: Networking
Classifier: Typing :: Typed
Requires-Python: >=3.12
Requires-Dist: aio-pika>=9.0
Requires-Dist: pydantic-settings>=2.0
Requires-Dist: structlog>=23.0
Provides-Extra: dev
Requires-Dist: basedpyright>=1.10; extra == 'dev'
Requires-Dist: pytest-asyncio>=0.23; extra == 'dev'
Requires-Dist: pytest-cov>=5.0; extra == 'dev'
Requires-Dist: pytest>=8.0; extra == 'dev'
Requires-Dist: ruff>=0.4; extra == 'dev'
Requires-Dist: testcontainers[rabbitmq]>=4.0; extra == 'dev'
Description-Content-Type: text/markdown

# aio-pika-batch

Batch consumer for [aio-pika](https://github.com/mosquito/aio-pika) with per-message acknowledgment, retry tracking, dead-letter routing, and worker scaling.

**The problem:** No Python library provides a batch consumer for RabbitMQ. aio-pika gives you queue iterators and manual ack. Celery, dramatiq, FastStream, pika — none offer batch processing with per-message result semantics. You end up rebuilding this from scratch every time.

**The solution:** Subclass `BatchConsumer`, implement `process_messages` as an async generator, yield `MessageResult` per message. The library handles batching, acknowledgment, retries, dead-letter routing, and worker scaling.

## Quick Start

```bash
pip install aio-pika-batch
```

```python
from aio_pika_batch import BatchConsumer, BatchConsumerSettings, MessageResult

class MyConsumer(BatchConsumer):
    async def process_messages(self, messages):
        for msg in messages:
            try:
                data = json.loads(msg.body)
                await publish(data)
                yield MessageResult.ACK
            except json.JSONDecodeError:
                yield MessageResult.NACK  # Bad data, send to DLX
            except ServiceUnavailable:
                yield MessageResult.REQUEUE  # Retry later

consumer = MyConsumer(BatchConsumerSettings(
    url="amqp://guest:guest@localhost:5672/",
    queue_name="my-queue",
    exchange_name="my-exchange",
    exchange_type="fanout",
    batch_size=50,
    batch_timeout=2.0,
    prefetch_count=300,
    num_workers=6,
))
await consumer.run()
```

## Features

| Feature | Description |
|---|---|
| **Batch collection** | Flush by size or timeout, whichever comes first |
| **Per-message results** | ACK, NACK, REQUEUE, or REQUEUE_IMMEDIATE per message in a batch |
| **Retry tracking** | `x-retry-count` header, configurable max attempts, then DLX |
| **Dead-letter routing** | Exhausted retries and NACKed messages go to DLX |
| **Worker scaling** | `num_workers=N` spawns N independent consumers on the same queue |
| **Connection recovery** | Exponential backoff reconnection per worker |
| **Graceful shutdown** | SIGINT/SIGTERM drain the current batch before stopping |
| **Lifecycle hooks** | `on_start()` / `on_stop()` for setup and cleanup |
| **Health checks** | `is_healthy` / `is_running` properties |
| **Configuration** | pydantic-settings — load from env vars, `.env` files, or code |

## Message Results

| Result | When to use | Broker action |
|---|---|---|
| `ACK` | Success | Remove from queue |
| `NACK` | Bad data, don't retry | Send to DLX (or discard) |
| `REQUEUE` | Transient failure | Republish to back of queue with retry count |
| `REQUEUE_IMMEDIATE` | Immediate retry (use sparingly) | Native requeue to front, no retry tracking |

### How REQUEUE works

`REQUEUE` republishes the message to the **back** of the queue with an incremented `x-retry-count` header. This provides natural backoff — other messages are processed while the failed one waits. When `retry_count >= max_requeue_attempts` (default: 3), the message is sent to the Dead Letter Exchange instead.

`REQUEUE_IMMEDIATE` uses RabbitMQ's native `nack(requeue=True)` which puts the message at the **front** of the queue with no retry tracking. This can cause infinite retry storms — only use it when you have external retry tracking or need immediate redelivery for very short transient failures.

## Chaining Consumers: Fan-In Pipelines

The most powerful pattern is chaining batch consumers into a pipeline. Multiple source consumers read from different exchanges, merge into a single staging queue, and a writer consumer drains to an external sink (Pub/Sub, BigQuery, S3, etc.):

```
RabbitMQ Exchanges              Staging              Sink
┌──────────────┐
│ user-events  │──┐
├──────────────┤  │         ┌──────────┐       ┌──────────────┐
│ impressions  │──┼────────→│ staging  │──────→│ Pub/Sub / BQ │
├──────────────┤  │         │  queue   │       │  / S3 / DB   │
│ search-logs  │──┘         └──────────┘       └──────────────┘
└──────────────┘
  SourceConsumer (×N)      WriterConsumer (×1)
  batch_size=50             batch_size=100
  num_workers=6             num_workers=4
```

Each consumer in the chain is a `BatchConsumer`. The source consumers read from their exchange, wrap the message, and publish to a shared fanout staging exchange. The writer consumer reads from the staging queue and pushes to the final destination:

```python
from aio_pika import ExchangeType, Message
from aio_pika_batch import BatchConsumer, BatchConsumerSettings, MessageResult

STAGING_EXCHANGE = "my-app.staging"

class SourceConsumer(BatchConsumer):
    """Read from a source exchange, forward to staging."""

    async def _run_internal(self, channel, queue, settings, stop_event, log):
        # Declare the shared staging exchange on our channel
        self._staging = await channel.declare_exchange(
            STAGING_EXCHANGE, type=ExchangeType.FANOUT, durable=True,
        )
        await super()._run_internal(channel, queue, settings, stop_event, log)

    async def process_messages(self, messages):
        for msg in messages:
            envelope = json.dumps({
                "source": self.settings.exchange_name,
                "payload": msg.body.decode(),
            }).encode()
            await self._staging.publish(Message(body=envelope), routing_key="")
            yield MessageResult.ACK

class WriterConsumer(BatchConsumer):
    """Read from staging, write to external sink."""

    async def process_messages(self, messages):
        # Publish all concurrently
        results = await asyncio.gather(
            *[write_to_sink(msg.body) for msg in messages],
            return_exceptions=True,
        )
        for result in results:
            if isinstance(result, Exception):
                yield MessageResult.REQUEUE
            else:
                yield MessageResult.ACK

# Start source consumers for each exchange
async def main():
    exchanges = ["user-events", "impressions", "search-logs"]
    consumers = []

    for exchange in exchanges:
        consumers.append(SourceConsumer(BatchConsumerSettings(
            queue_name=f"ingestor:{exchange}",
            exchange_name=exchange,
            exchange_type="fanout",
            batch_size=50, num_workers=6, prefetch_count=300,
        )))

    # Writer reads from the staging queue
    consumers.append(WriterConsumer(BatchConsumerSettings(
        queue_name="staging-writer",
        exchange_name=STAGING_EXCHANGE,
        exchange_type="fanout",
        batch_size=100, num_workers=4, prefetch_count=400,
    )))

    await asyncio.gather(*[c.run() for c in consumers])
```

This pattern gives you:
- **Fan-in**: N sources merge into 1 staging queue
- **Backpressure**: each stage has independent batch size, workers, and prefetch
- **Isolation**: source failures don't block the writer, writer failures don't block sources
- **Per-message guarantees**: ACK/NACK/REQUEUE at every stage

See [`examples/rmq_to_pubsub/`](examples/rmq_to_pubsub/) for a complete Google Cloud Pub/Sub implementation of this pattern.

## Single Message Consumer

For simple cases where you don't need batching:

```python
from aio_pika_batch import Consumer, ConsumerSettings, MessageResult

class MyConsumer(Consumer):
    async def process_message(self, message) -> MessageResult:
        data = json.loads(message.body)
        await save(data)
        return MessageResult.ACK

consumer = MyConsumer(ConsumerSettings(
    queue_name="my-queue",
    exchange_name="my-exchange",
))
await consumer.run()
```

## Configuration

All settings can be loaded from environment variables via [pydantic-settings](https://docs.pydantic.dev/latest/concepts/pydantic_settings/):

```python
from pydantic_settings import SettingsConfigDict
from aio_pika_batch import BatchConsumerSettings

class MySettings(BatchConsumerSettings):
    model_config = SettingsConfigDict(env_prefix="MY_APP_")

# Reads MY_APP_QUEUE_NAME, MY_APP_BATCH_SIZE, etc. from environment
settings = MySettings()
```

### Settings Reference

| Setting | Default | Description |
|---|---|---|
| `url` | `amqp://guest:guest@localhost:5672/` | AMQP connection URL |
| `queue_name` | *(required)* | Queue to consume from |
| `exchange_name` | `None` | Exchange to declare and bind |
| `exchange_type` | `direct` | Exchange type (direct, fanout, topic, headers) |
| `routing_key` | `None` | Routing key for binding |
| `prefetch_count` | `10` | QoS prefetch per channel |
| `num_workers` | `1` | Parallel consumer workers |
| `batch_size` | `10` | Max messages per batch |
| `batch_timeout` | `5.0` | Seconds before flushing incomplete batch |
| `max_requeue_attempts` | `3` | Retries before sending to DLX |
| `unhandled_exception_action` | `REQUEUE` | Default action for unhandled exceptions |
| `queue_durable` | `True` | Queue survives broker restarts |
| `exchange_durable` | `True` | Exchange survives broker restarts |
| `queue_arguments` | `None` | Extra args (TTL, DLX, etc.) |

### Tuning Tips

- Set `prefetch_count >= batch_size * num_workers` so the broker delivers enough messages to fill batches
- `batch_timeout` controls latency for low-traffic periods — lower means faster flushes of partial batches
- `num_workers` scales consumption linearly up to the point where the broker or downstream becomes the bottleneck

## Lifecycle

Three ways to run:

```python
# 1. run() — blocks with signal handling
await consumer.run()

# 2. start()/stop() — manual control
await consumer.start()
# ... later ...
await consumer.stop()

# 3. Async context manager
async with consumer:
    await asyncio.Event().wait()
```

### Hooks

Override `on_start` and `on_stop` for setup/cleanup:

```python
class MyConsumer(BatchConsumer):
    async def on_start(self):
        self.http = aiohttp.ClientSession()

    async def on_stop(self):
        await self.http.close()

    async def process_messages(self, messages):
        ...
```

## Comparison

| Feature | aio-pika-batch | aio-pika | MassTransit (.NET) | Spring AMQP (Java) |
|---|---|---|---|---|
| Batch consumer | Yes | No | Yes | Yes |
| Per-message results in batch | **Yes** | N/A | No (atomic) | No (atomic) |
| Retry tracking | `x-retry-count` header | Manual | Built-in | Built-in |
| DLX routing | Automatic | Manual | Automatic | Automatic |
| Worker scaling | `num_workers` | Manual | Concurrent consumers | `concurrentConsumers` |
| Async Python | Yes | Yes | No (C#) | No (Java) |

## Examples

- [`examples/simple/`](examples/simple/) — Single-message consumer
- [`examples/batch/`](examples/batch/) — Batch consumer with concurrent processing
- [`examples/rmq_to_pubsub/`](examples/rmq_to_pubsub/) — Full RabbitMQ → Google Cloud Pub/Sub pipeline

## Requirements

- Python 3.12+
- RabbitMQ 3.8+

## License

MIT
