Metadata-Version: 2.4
Name: resumable-stream
Version: 0.1.0
Summary: Library for wrapping async streams to allow client resumption after connection loss
License-Expression: MIT
Requires-Python: >=3.9
Description-Content-Type: text/markdown
Requires-Dist: redis>=5.0.0
Provides-Extra: dev
Requires-Dist: pytest>=7.0.0; extra == "dev"
Requires-Dist: pytest-asyncio>=0.21.0; extra == "dev"
Requires-Dist: fakeredis>=2.20.0; extra == "dev"

# resumable-stream

A Python library for wrapping async streams to allow client resumption after connection loss.

This is a Python port of the [vercel/resumable-stream](https://github.com/vercel/resumable-stream) TypeScript library.

## Features

- **Resumable SSE streams**: Clients can resume streams after disconnection
- **Redis-based persistence**: Uses Redis pub/sub for cross-instance communication
- **Serverless-friendly**: Designed for environments without sticky load balancing
- **Low latency**: Minimal Redis operations for the common case (single `INCR` and `SUBSCRIBE` per stream)

## Installation

```bash
pip install resumable-stream
```

## Usage

### Idempotent API (Recommended)

The `resumable_stream` method automatically creates a new stream or resumes an existing one:

```python
import asyncio
from resumable_stream import create_resumable_stream_context

# Create context (uses REDIS_URL env var by default)
ctx = create_resumable_stream_context(redis_url="redis://localhost:6379")

async def my_stream():
    """Your actual stream producer."""
    for i in range(10):
        yield f"data: chunk {i}\n\n"
        await asyncio.sleep(0.5)

async def handle_request(stream_id: str, resume_at: int = None):
    stream = await ctx.resumable_stream(
        stream_id,
        my_stream,
        skip_characters=resume_at
    )
    
    if stream is None:
        # Stream already finished
        return {"status": 422, "body": "Stream is already done"}
    
    async for chunk in stream:
        yield chunk

# Use with FastAPI
from fastapi import FastAPI
from fastapi.responses import StreamingResponse

app = FastAPI()

@app.get("/stream/{stream_id}")
async def stream_endpoint(stream_id: str, resume_at: int = None):
    return StreamingResponse(
        handle_request(stream_id, resume_at),
        media_type="text/event-stream"
    )
```

### Explicit Creation/Resumption

For more control, use separate methods for creating and resuming streams:

```python
# POST endpoint - create new stream
async def create_stream(stream_id: str):
    stream = await ctx.create_new_resumable_stream(stream_id, my_stream)
    if stream is None:
        return {"status": 422, "body": "Stream already exists"}
    
    async for chunk in stream:
        yield chunk

# GET endpoint - resume existing stream
async def resume_stream(stream_id: str, resume_at: int = None):
    stream = await ctx.resume_existing_stream(stream_id, resume_at)
    
    if stream == "NOT_FOUND":
        return {"status": 404, "body": "Stream not found"}
    if stream is None:
        return {"status": 422, "body": "Stream is already done"}
    
    async for chunk in stream:
        yield chunk
```

### Check Stream State

```python
state = await ctx.has_existing_stream(stream_id)

if state is None:
    print("No stream exists")
elif state is True:
    print("Stream is active")
elif state == "DONE":
    print("Stream is finished")
```

## How It Works

1. **First request**: When `resumable_stream` is called for a new `stream_id`, a producer stream is created
2. **Producer persistence**: The producer always completes the stream, even if the original reader disconnects
3. **Resume requests**: Additional consumers publish a message to request stream resumption
4. **Chunk delivery**: The producer sends buffered chunks and continues streaming to all consumers via Redis pub/sub

### Redis Key Structure

- `{prefix}:rs:sentinel:{streamId}` - Stream state ("1" = active, "DONE" = finished)
- `{prefix}:rs:request:{streamId}` - Channel for resume requests
- `{prefix}:rs:chunk:{listenerId}` - Channel for delivering chunks to consumers

## Configuration

```python
ctx = create_resumable_stream_context(
    key_prefix="resumable-stream",  # Redis key prefix
    redis_url="redis://localhost:6379",  # Or use REDIS_URL env var
    publisher=my_publisher,  # Custom Publisher implementation
    subscriber=my_subscriber,  # Custom Subscriber implementation
)
```

## Custom Redis Clients

You can provide custom `Publisher` and `Subscriber` implementations for non-standard Redis setups:

```python
from resumable_stream import Publisher, Subscriber

class MyPublisher:
    async def connect(self) -> None: ...
    async def publish(self, channel: str, message: str) -> int: ...
    async def set(self, key: str, value: str, *, ex: int = None) -> str: ...
    async def get(self, key: str) -> str | None: ...
    async def incr(self, key: str) -> int: ...

class MySubscriber:
    async def connect(self) -> None: ...
    async def subscribe(self, channel: str, callback) -> None: ...
    async def unsubscribe(self, channel: str) -> None: ...
```

## Environment Variables

- `REDIS_URL` or `KV_URL`: Redis connection URL (used if no explicit URL provided)
- `DEBUG`: Set to enable debug logging

## License

MIT
