Metadata-Version: 2.4
Name: redis-stream-queue
Version: 0.1.0
Summary: General-purpose async Redis Streams consumer group library with DLQ, crash recovery, and monitoring
Author: Min An
License: MIT
License-File: LICENSE
Keywords: async,consumer-group,dlq,queue,redis,streams,worker
Classifier: Framework :: AsyncIO
Classifier: License :: OSI Approved :: MIT License
Classifier: Operating System :: OS Independent
Classifier: Programming Language :: Python :: 3
Classifier: Programming Language :: Python :: 3.11
Classifier: Programming Language :: Python :: 3.12
Classifier: Topic :: Software Development :: Libraries
Classifier: Topic :: System :: Distributed Computing
Requires-Python: >=3.11
Requires-Dist: redis>=5.0.0
Provides-Extra: dev
Requires-Dist: fakeredis>=2.26; extra == 'dev'
Requires-Dist: hatchling; extra == 'dev'
Requires-Dist: msgpack>=1.0; extra == 'dev'
Requires-Dist: pytest-asyncio>=0.24; extra == 'dev'
Requires-Dist: pytest>=8; extra == 'dev'
Requires-Dist: python-dotenv>=1.0; extra == 'dev'
Provides-Extra: msgpack
Requires-Dist: msgpack>=1.0; extra == 'msgpack'
Description-Content-Type: text/markdown

# redis-stream-queue

Async Python library for Redis Streams consumer groups with built-in crash recovery, DLQ, and monitoring.

## Features

- **Producer**: push messages from any number of pods — XADD is atomic, no coordination needed
- **Consumer**: callback-based read → ACK loop; partial ACK supported
- **Crash recovery**: XAUTOCLAIM cursor loop sweeps full PEL per iteration; NOGROUP auto-recovery if stream deleted externally
- **Dead-letter queue**: decode errors and poison pills (exceeding `max_deliveries`) routed to DLQ handler
- **Consumer metrics**: `consumer.metrics()` — `tps_in` / `tps_out` / `tps_total` (60s sliding window), avg TPS, read/acked/DLQ/error counters
- **Producer metrics**: `producer.metrics()` — push TPS, total pushed, avg TPS, uptime
- **Process-wide aggregation**: `StreamConsumer.all_metrics()` / `StreamProducer.all_metrics()` — collect from all live instances in this process via weakref registry; zero Redis overhead
- **Stream monitoring**: lag, PEL size, per-consumer idle time, health checks via `ConsumerGroup`
- **Pluggable serializers**: JSON (default), msgpack, pickle — or bring your own
- **Redis Cluster**: `from_cluster()` and `from_url()` factory methods
- **Multi-pod safe**: unique worker names auto-generated per pod (`{group}_{hostname}_{rand4}`)

---

## Requirements

- Python ≥ 3.11
- Redis ≥ 6.2 (XAUTOCLAIM support)

---

## Installation

```bash
pip install redis-stream-queue
```

With msgpack support:

```bash
pip install redis-stream-queue[msgpack]
```

---

## Quick Start

### 1. Start Redis

```bash
docker run -p 6379:6379 redis
```

### 2. Producer

```python
import asyncio
from redis_stream_queue import StreamClient, StreamProducer

async def main():
    client = StreamClient(host="localhost")
    producer = StreamProducer(client=client, stream="orders", group="order_workers")

    await producer.ensure_group()           # idempotent — safe to call on every startup
    msg_id = await producer.push({"order_id": 1})
    print(f"pushed: {msg_id}")
    await client.close()

asyncio.run(main())
```

### 3. Consumer

```python
import asyncio
from redis_stream_queue import StreamClient, StreamConsumer, ConsumerConfig

async def handle(messages):
    for msg in messages:
        print(f"processing: {msg.data}")
    return [m.id for m in messages]     # return IDs to ACK; omit to leave in PEL

async def on_dlq(msg, reason):
    print(f"DLQ [{reason}]: {msg.data}")

async def main():
    client = StreamClient(host="localhost")
    config = ConsumerConfig(
        group="order_workers",
        dlq_stream="orders_dlq",
        batch_size=100,
        block_ms=5_000,
        max_deliveries=3,
    )
    consumer = StreamConsumer(
        client=client,
        stream="orders",
        config=config,
        handler=handle,
        dlq_handler=on_dlq,
    )
    await consumer.run()    # infinite loop; Ctrl-C / CancelledError to stop

asyncio.run(main())
```

> **Handler contract**: return a list of IDs to ACK. Return `[]` to ACK nothing (messages stay in PEL for retry). Never return `None` — that triggers a warning and no ACK.

### 4. Consumer Throughput Metrics

`consumer.metrics()` is non-blocking and makes no Redis calls — safe to poll from any monitoring loop or health endpoint.

```python
import asyncio
from redis_stream_queue import StreamClient, StreamConsumer, ConsumerConfig

async def monitor(consumer):
    while True:
        m = consumer.metrics()
        print(
            f"in={m.tps_in:.1f} msg/s  out={m.tps_out:.1f} msg/s  total={m.tps_total:.1f} msg/s  "
            f"avg={m.avg_tps:.1f} msg/s  "
            f"read={m.total_read}  acked={m.total_acked}  "
            f"dlq={m.total_dlq}  errors={m.total_errors}  "
            f"uptime={m.uptime_secs:.0f}s"
        )
        await asyncio.sleep(5)

async def main():
    client = StreamClient(host="localhost")
    config = ConsumerConfig(group="order_workers")
    consumer = StreamConsumer(client=client, stream="orders", config=config, handler=handle)

    await asyncio.gather(consumer.run(), monitor(consumer))
```

| Field | Type | Description |
|---|---|---|
| `tps_in` | `float` | Reads/sec — XREADGROUP + XAUTOCLAIM, sliding 60s window |
| `tps_out` | `float` | Acked/sec — sliding 60s window |
| `tps_total` | `float` | `tps_in + tps_out` |
| `avg_tps` | `float` | `total_acked / uptime_secs` since first message |
| `total_read` | `int` | Messages pulled from stream (new + reclaimed via XAUTOCLAIM) |
| `total_acked` | `int` | Successfully processed and ACKed by handler |
| `total_dlq` | `int` | Routed to DLQ (`decode_error` + `max_deliveries` combined) |
| `total_errors` | `int` | Handler exceptions — message stays in PEL for retry |
| `uptime_secs` | `float` | Seconds since first message was processed |

### 5. Producer Throughput Metrics

```python
m = producer.metrics()
print(f"push={m.tps:.1f} msg/s  avg={m.avg_tps:.1f} msg/s  total={m.total_pushed}  uptime={m.uptime_secs:.0f}s")
```

| Field | Type | Description |
|---|---|---|
| `total_pushed` | `int` | Messages pushed since instance creation |
| `tps` | `float` | Pushed/sec — sliding 60s window |
| `avg_tps` | `float` | `total_pushed / uptime_secs` since first push |
| `uptime_secs` | `float` | Seconds since first push |

### 6. Process-Wide Metrics (Multiple Instances)

Each `StreamConsumer` and `StreamProducer` auto-registers in a process-level weakref registry on creation. Dead instances are evicted automatically by GC — no manual cleanup needed.

```python
# Multiple consumers in same asyncio event loop
c1 = StreamConsumer(client=client, stream="orders", config=cfg_a, handler=handle_a)
c2 = StreamConsumer(client=client, stream="payments", config=cfg_b, handler=handle_b)
c3 = StreamConsumer(client=client, stream="events", config=cfg_c, handler=handle_c)

await asyncio.gather(c1.run(), c2.run(), c3.run())

# From a monitoring task running concurrently:
for m in StreamConsumer.all_metrics():
    print(f"in={m.tps_in:.1f}  out={m.tps_out:.1f}  acked={m.total_acked}")

# Multiple producers
p1 = StreamProducer(client=client, stream="orders")
p2 = StreamProducer(client=client, stream="payments")

for m in StreamProducer.all_metrics():
    print(f"tps={m.tps:.1f}  pushed={m.total_pushed}")
```

> **Multi-pod note**: `all_metrics()` is in-process only — it sees instances in this pod, not other pods.
> For cross-pod aggregation, expose metrics via a health endpoint (FastAPI, Django, plain HTTP) and scrape with Prometheus or a similar tool. Each pod reports its own slice; your scraper aggregates across pods. Zero extra Redis IOPS.

### 7. Stream / Group Monitoring

```python
from redis_stream_queue import StreamClient, ConsumerGroup

async def main():
    client = StreamClient(host="localhost")
    cg = ConsumerGroup(client, stream="orders", group="order_workers")

    # Stream-level stats (requires Redis calls)
    stats = await cg.stats(dlq_stream="orders_dlq")
    print(f"length={stats.stream_length}  lag={stats.lag}  pel={stats.group_pel_size}")
    for c in stats.consumers:
        print(f"  consumer={c.name}  pending={c.pending}  idle={c.idle_ms}ms")

    # Health check
    health = await cg.health_check(max_lag=1_000, max_idle_ms=60_000)
    print(f"healthy={health['healthy']}  issues={health['issues']}")

    # Inspect stuck messages
    pending = await cg.pending_details(count=50)
    for entry in pending:
        print(f"  {entry.id}  consumer={entry.consumer}  deliveries={entry.delivery_count}")
```

---

## Configuration Reference

### `StreamClient`

| Param | Default | Description |
|---|---|---|
| `host` | `"localhost"` | Redis host |
| `port` | `6379` | Redis port |
| `db` | `0` | Redis DB index |
| `username` | `None` | AUTH username |
| `password` | `None` | AUTH password |
| `prefix` | `""` | Key prefix prepended to all stream names (`{prefix}_{stream}`) |
| `max_connections` | `1000` | Connection pool size |
| `pool_timeout` | `5` | Seconds to wait for a free connection |
| `ssl` | `False` | Enable TLS |

**Cluster / URL variants:**

```python
# Redis Cluster
client = StreamClient.from_cluster(
    startup_nodes=[{"host": "node1", "port": 6379}],
    password="secret",
)

# URL — single node
client = StreamClient.from_url("redis://localhost:6379/0")
client = StreamClient.from_url("rediss://user:pass@host:6380/0")   # TLS

# URL — cluster
client = StreamClient.from_url("redis+cluster://node1:6379")
client = StreamClient.from_url("rediss+cluster://node1:6380")      # TLS cluster
```

> **Cluster key rule**: stream and DLQ stream must share a hash tag to land on the same slot:
> ```python
> stream="{orders}_main", dlq_stream="{orders}_dlq"
> ```

### `ConsumerConfig`

| Param | Default | Description |
|---|---|---|
| `group` | required | Consumer group name |
| `worker_name` | auto | Unique consumer name per pod. Auto = `{group}_{hostname}_{rand4}` (group part truncated if needed to preserve hostname+suffix) |
| `dlq_stream` | `None` | Stream name to route poison pills and decode errors to |
| `dlq_group` | `None` | Consumer group for the DLQ stream |
| `batch_size` | `100` | Max messages per XREADGROUP / XAUTOCLAIM call |
| `block_ms` | `5000` | XREADGROUP block timeout (ms) — 0 = non-blocking |
| `min_idle_claim_ms` | `10000` | XAUTOCLAIM idle threshold (ms). Set to at least 2× max handler latency |
| `max_deliveries` | `3` | Delivery count before message is routed to DLQ |
| `max_stream_size` | `100000` | Approximate XADD MAXLEN trim |
| `max_claim_passes` | `None` | Max XAUTOCLAIM cursor iterations per `run_once()`. `None` = sweep full PEL. Set to `1` to restore single-pass behavior |

---

## Serializers

```python
from redis_stream_queue import JsonSerializer, MsgpackSerializer, PickleSerializer

# Serializer must match on both producer and consumer
producer = StreamProducer(..., serializer=MsgpackSerializer())
consumer = StreamConsumer(..., serializer=MsgpackSerializer())
```

| Serializer | Extra | Notes |
|---|---|---|
| `JsonSerializer` | none | Default; human-readable, broadest compat |
| `MsgpackSerializer` | `[msgpack]` | Smaller wire size, faster encode/decode |
| `PickleSerializer` | none | Any Python type; requires same Python version on both ends; **do not use with untrusted data** |

Decode failures → `dlq_handler(msg, "decode_error")` + immediate XACK (no retry — corrupt data will always fail).

Custom serializer — implement the `Serializer` protocol:

```python
from redis_stream_queue import Serializer

class CborSerializer:
    def encode(self, data: dict) -> bytes:
        import cbor2
        return cbor2.dumps(data)

    def decode(self, raw: bytes) -> dict:
        import cbor2
        return cbor2.loads(raw)
```

---

## Multi-Pod Deployment

### Producers

No coordination needed. XADD is atomic; Redis assigns unique IDs (`{timestamp}-{seq}`). All pods write to the same stream key safely.

### Consumers

Each pod gets a unique `worker_name`. Redis distributes messages across consumers in the same group — each message delivered to exactly one pod.

```
Pod A: worker_name = "order_workers_pod-a_3821" ─┐
Pod B: worker_name = "order_workers_pod-b_9174" ─┼─ group "order_workers"
Pod C: worker_name = "order_workers_pod-c_0042" ─┘
```

**Crash recovery**: if a pod crashes, its unacknowledged PEL messages go idle. Other pods reclaim them via XAUTOCLAIM after `min_idle_claim_ms`. Set this to at least 2× your maximum handler latency.

**NOGROUP recovery**: if the stream or group is deleted externally (e.g. `FLUSHALL`, `XGROUP DESTROY`), the consumer detects the `NOGROUP` error, clears its entry from the group registry, and re-creates the group on the next iteration — no manual restart required.

**Recommended settings for multi-pod:**

| Setting | Value | Reason |
|---|---|---|
| `worker_name` | auto (default) | Unique per pod |
| `min_idle_claim_ms` | 2× max handler latency | Avoid premature cross-pod reclaim |
| `block_ms` | `5000` | Balanced latency vs idle CPU |
| `max_deliveries` | `3–5` | Accounts for transient failures across restarts |

---

## Consumer Loop Internals

Each `run_once()` call executes four steps:

```
1. ensure()           — XGROUP CREATE mkstream; no-op if group known in class-level registry.
                        Registry entry removed on NOGROUP — re-creation runs on next iteration.

2. XREADGROUP ">"     — fetch new undelivered messages
   ├─ decode error    → dlq_handler(msg, "decode_error") + XACK (no retry)
   ├─ handler(msgs)   → XACK returned IDs; total_acked += n; tps_out tracker updated
   ├─ tps_in          updated with len(raw_messages)
   ├─ handler → None  → warning logged; no XACK (treat as explicit "ACK nothing")
   └─ unacked IDs     stay in PEL for XAUTOCLAIM recovery

3. XAUTOCLAIM cursor loop — reclaims msgs idle > min_idle_claim_ms
   ├─ follows cursor until Redis returns "0-0" (full PEL swept) or stall detected
   ├─ max_claim_passes caps iterations if set; None = unlimited (default)
   ├─ tps_in          updated with len(claimed) per batch
   └─ reclaimed msgs  → same handler → XACK; tps_out updated

4. XPENDING sweep     — find entries with delivery_count >= max_deliveries
   ├─ no dlq_handler  → warning logged with IDs; still ACKed (message cleared)
   ├─ msg missing from stream (XDEL'd) → warning logged; still ACKed
   └─ poison pills    → dlq_handler(msg, "max_deliveries") + batched XACK

run() wraps run_once() in an infinite loop:
   ├─ CancelledError  → re-raised immediately (clean shutdown)
   ├─ NOGROUP error   → registry entry cleared; sleep 1s; re-enter loop
   └─ any other error → logged; sleep 1s; re-enter loop
```

**Metrics updated per iteration:**
- `total_read` incremented on every XREADGROUP / XAUTOCLAIM batch
- `tps_in` sliding tracker records every read batch (new + reclaimed)
- `total_acked` incremented after each successful handler → XACK
- `tps_out` sliding tracker records every ack batch
- `total_dlq` incremented for decode errors + poison pill ACKs
- `total_errors` incremented for handler exceptions

---

## Sequence Diagrams

> Render with [PlantUML](https://plantuml.com/), VS Code [PlantUML extension](https://marketplace.visualstudio.com/items?itemName=jebbs.plantuml), or IntelliJ PlantUML plugin.

### 1. Producer: Push Message

```plantuml
@startuml
title Producer — Push Message

participant "App" as App
participant "StreamProducer" as P
participant "StreamClient" as C
database "Redis Stream" as R

App -> P : push(data: dict)
P -> P : serializer.encode(data) → bytes
P -> C : push(stream, encoded_bytes, max_len)
C -> R : XADD {stream} MAXLEN ~ {max_len}\n  * data {bytes}
R --> C : message_id (e.g. "1700000000-0")
C --> P : message_id
P -> P : total_pushed += 1\ntps_tracker.record(1)
P --> App : message_id
@enduml
```

### 2. Consumer: Normal Message Processing

```plantuml
@startuml
title Consumer — Normal Message Processing (run_once step 2)

participant "StreamConsumer\nrun_once()" as Consumer
participant "StreamClient" as C
participant "handler()" as H
database "Redis PEL" as PEL

Consumer -> C : read(stream, group, worker_name,\n      count=batch_size, block_ms)
C -> PEL : XREADGROUP GROUP {group} {worker}\n  COUNT {n} BLOCK {ms} STREAMS {stream} >
PEL --> C : [(id1, fields), (id2, fields), ...]
note right of PEL : Messages enter PEL\n(pending until ACKed)
C --> Consumer : [StreamMessage, ...]

Consumer -> Consumer : tps_in.record(n)\ntotal_read += n
Consumer -> Consumer : serializer.decode(raw) per message
Consumer -> H : handler(decoded_messages)
H --> Consumer : [acked_ids]

Consumer -> C : ack(stream, group, *acked_ids)
C -> PEL : XACK {stream} {group} {id1} {id2} ...
note right of PEL : Messages removed from PEL
Consumer -> Consumer : total_acked += len(acked_ids)\ntps_out.record(n)

note over Consumer : Unacked IDs stay in PEL\nfor crash recovery
@enduml
```

### 3. Crash Recovery: XAUTOCLAIM Cursor Loop

```plantuml
@startuml
title Crash Recovery — XAUTOCLAIM Cursor Loop (run_once step 3)

participant "Pod A" as A
participant "Pod B\nrun_once()" as B
database "Redis PEL" as PEL

== Pod A: reads, starts processing, then crashes ==
A -> PEL : XREADGROUP → msg_001 assigned to Pod A
note over A : Pod A crashes.\nmsg_001 never ACKed.\nSits in PEL, idle growing.

== Pod B: autoclaim cursor loop sweeps full PEL ==
loop cursor != "0-0" and no stall
    B -> PEL : XAUTOCLAIM group={group} consumer={B}\n  min_idle_time={ms} count={n} start_id={cursor}
    PEL --> B : (next_cursor, [reclaimed_msgs])
    B -> B : tps_in.record(n)\n_process_batch(reclaimed_msgs)
    B -> B : cursor = next_cursor
end
note right of PEL : Loop exits when cursor="0-0"\n(full PEL swept) or stall detected.\nmax_claim_passes caps iterations if set.

B -> PEL : XACK {stream} {group} msg_001
note right of PEL : msg_001 cleared from PEL
@enduml
```

### 4. Decode Error → Immediate DLQ

```plantuml
@startuml
title Decode Error — Immediate DLQ (run_once step 2)

participant "_process_batch()" as Batch
participant "RetryHandler" as Retry
participant "dlq_handler()" as DLQ
database "Redis PEL" as PEL

Batch -> Batch : serializer.decode(raw) — raises Exception
note right of Batch : Corrupt or wrong format.\nCannot retry — will always fail.

Batch -> Retry : send_to_dlq(msg, "decode_error")
Retry -> DLQ : dlq_handler(msg, "decode_error")
DLQ --> Retry : (done; dlq handler errors swallowed)

Batch -> PEL : XACK bad_msg_id
Batch -> Batch : total_dlq += 1
note right of PEL : Removed immediately.\nWill NOT re-enter consumer\nloop for more retries.
@enduml
```

### 5. Poison Pill → DLQ After Max Deliveries

```plantuml
@startuml
title Poison Pill — DLQ After max_deliveries (run_once step 4)

participant "RetryHandler\nhandle_poison_pills()" as Retry
participant "dlq_handler()" as DLQ
database "Redis PEL" as PEL
database "Redis Stream" as Stream

note over PEL : msg_999 redelivered\ndelivery_count >= max_deliveries

Retry -> PEL : XPENDING_RANGE {stream} {group}\n  - + count={batch_size}
PEL --> Retry : [PendingEntry(id=msg_999, delivery_count=5)]

Retry -> Retry : filter: delivery_count >= max_deliveries

alt message exists in stream
    Retry -> Stream : XRANGE {stream} msg_999 msg_999 COUNT 1
    Stream --> Retry : raw bytes
    Retry -> Retry : try_decode(raw)
    Retry -> DLQ : dlq_handler(decoded_msg, "max_deliveries")
else message deleted from stream (XDEL)
    Retry -> Retry : log warning "missing from stream"\n(no DLQ call — nothing to forward)
end

Retry -> PEL : XACK {stream} {group} *all_poison_ids  (batched)
note right of PEL : Poison pills cleared.\nNever retried again.
@enduml
```

### 6. NOGROUP Auto-Recovery

```plantuml
@startuml
title NOGROUP Auto-Recovery

participant "run() loop" as Loop
participant "ConsumerGroup" as CG
database "Redis" as R
participant "run_once()" as Once

Loop -> Once : await run_once()
Once -> CG : ensure()  [key in _known → skipped]
Once -> R : XREADGROUP ...
R --> Once : ResponseError: NOGROUP ...
Once --> Loop : raises Exception("NOGROUP ...")

Loop -> Loop : "NOGROUP" in str(e) → True
Loop -> CG : reset()  [removes key from _known]
Loop -> Loop : sleep(1)

Loop -> Once : await run_once()  [next iteration]
Once -> CG : ensure()  [key not in _known → runs]
CG -> R : XGROUP CREATE {stream} {group} MKSTREAM
R --> CG : OK
CG -> CG : _known.add(key)
Once -> R : XREADGROUP ...  [normal processing resumes]
@enduml
```

### 7. Full run_once() Lifecycle with Metrics

```plantuml
@startuml
title Full run_once() Lifecycle

participant "run_once()" as Loop
participant "ConsumerGroup\nensure()" as CG
participant "StreamClient" as C
participant "RetryHandler" as Retry
database "Redis" as R

Loop -> CG : ensure(dlq_stream, dlq_group)
note right of CG : No-op if key in _known.\nRuns XGROUP CREATE on first call\nor after NOGROUP reset.

group Step 2: New messages
    Loop -> C : read(stream, group, worker, batch_size, block_ms)
    C -> R : XREADGROUP GROUP {group} {worker} COUNT {n} BLOCK {ms} STREAMS {stream} >
    R --> Loop : [StreamMessage, ...]
    Loop -> Loop : total_read += n\ntps_in.record(n)\n_process_batch(messages)
    note right of Loop : ACKed IDs → total_acked, tps_out.record(n)
end

group Step 3: Orphan recovery (cursor loop)
    loop cursor != "0-0"
        Loop -> C : autoclaim(stream, group, worker, min_idle_ms, batch_size, cursor)
        C -> R : XAUTOCLAIM min_idle={ms} count={n} start_id={cursor}
        R --> Loop : (next_cursor, [reclaimed StreamMessage])
        Loop -> Loop : total_read += n\ntps_in.record(n)\n_process_batch(claimed)
        Loop -> Loop : cursor = next_cursor
    end
end

group Step 4: Poison-pill sweep
    Loop -> Retry : handle_poison_pills() → int
    Retry -> R : XPENDING_RANGE → filter delivery_count >= max_deliveries
    Retry -> R : XRANGE per poison ID (fetch raw bytes)
    Retry -> Retry : dlq_handler(msg, "max_deliveries") per pill
    Retry -> R : XACK {stream} {group} *poison_ids  (batched)
    Retry --> Loop : count of pills processed
    Loop -> Loop : total_dlq += count
end
@enduml
```

---

## Running the Example

```bash
# terminal 1
docker run -p 6379:6379 redis

# terminal 2
python examples/basic_worker.py
```

---

## Development

### Setup

```bash
git clone <repo>
cd redis-stream-queue

python3 -m venv .venv
source .venv/bin/activate        # Windows: .venv\Scripts\activate

pip install -e ".[dev]"          # editable install + test deps
```

### Running Tests

Tests use [fakeredis](https://github.com/cunla/fakeredis-py) — no real Redis needed.

```bash
# all tests
pytest tests/

# single file
pytest tests/test_consumer.py

# single test with verbose output
pytest tests/test_consumer.py::test_poison_pill_goes_to_dlq -v -s

# with coverage
pip install pytest-cov
pytest tests/ --cov=src/redis_stream_queue --cov-report=term-missing
```

### Project Structure

```
src/
└── redis_stream_queue/
    ├── __init__.py     # public exports
    ├── client.py       # StreamClient — connection pool, all X* commands
    ├── producer.py     # StreamProducer — push + ProducerMetrics + all_metrics()
    ├── consumer.py     # StreamConsumer — main loop + all_metrics()
    ├── group.py        # ConsumerConfig + ConsumerGroup (class-level registry, stats, health)
    ├── message.py      # StreamMessage, PendingEntry, StreamStats, ConsumerInfo,
    │                   # ConsumerMetrics, ProducerMetrics, _TpsTracker
    ├── retry.py        # RetryHandler — poison-pill detection + DLQ routing
    ├── serializers.py  # Json / Msgpack / Pickle
    └── exceptions.py
tests/
    conftest.py         # shared fixtures: fake_redis, make_client, registry resets
    test_consumer.py    # consumer loop, metrics, all_metrics, NOGROUP recovery
    test_group.py       # stats, health check, pending details
    test_producer.py    # push, ensure_group, metrics, all_metrics
examples/
    basic_worker.py
```

### Known Limitations

- **`fetch_by_ids` N+1**: poison-pill fetch does one XRANGE call per ID. Fine for typical `max_deliveries` counts (< 10); would benefit from pipeline for very large poison batches.
