Metadata-Version: 2.4
Name: llm-stream-guard
Version: 0.1.2
Summary: Streaming keyword guard for LLM output.
Author-email: zjding <1095245867@qq.com>
License-Expression: MIT
Keywords: llm,streaming,guardrails,content-safety,sensitive-words,aho-corasick
Classifier: Development Status :: 3 - Alpha
Classifier: Intended Audience :: Developers
Classifier: Programming Language :: Python :: 3
Classifier: Programming Language :: Python :: 3.10
Classifier: Programming Language :: Python :: 3.11
Classifier: Programming Language :: Python :: 3.12
Classifier: Programming Language :: Python :: 3.13
Classifier: Topic :: Software Development :: Libraries :: Python Modules
Classifier: Typing :: Typed
Requires-Python: >=3.10
Description-Content-Type: text/markdown
License-File: LICENSE
Requires-Dist: pyahocorasick>=2.0.0
Dynamic: license-file

# LLM Stream Guard

Provider-agnostic streaming keyword guard for LLM output.

`llm-stream-guard` does not call OpenAI, Anthropic, Agno, LangChain, or any other model framework. It only accepts an `AsyncIterable[str]`, checks the streamed text before it reaches the client, and emits safe `delta`, `blocked`, and `done` events.

License: MIT

## Why

LLM providers usually stream output in small chunks. A blocked phrase can be split across chunks:

```text
chunk 1: "hello hydr"
chunk 2: "angea world"
```

If `hydrangea` is blocked, this package emits only:

```text
hello 
```

Then it emits `BlockedEvent(word="hydrangea", ...)`. The blocked word is not flushed to the client.

## Install

```bash
pip install llm-stream-guard
```

or:

```bash
uv add llm-stream-guard
```

## Quick Start

Create a word list:

```text
# block_words.txt
hydrangea
violet comet
forbidden nebula
```

Use it with any async text stream:

```python
from llm_stream_guard import BlockedEvent, DeltaEvent, StreamGuard

guard = StreamGuard.from_file("block_words.txt")

async for event in guard.wrap(model_text_stream(), on_block=cancel_model):
    if isinstance(event, DeltaEvent):
        yield event.text

    if isinstance(event, BlockedEvent):
        yield {"type": "blocked", "word": event.word}
        break
```

## Core Idea

The package only needs this shape:

```python
from collections.abc import AsyncIterable


async def model_text_stream() -> AsyncIterable[str]:
    yield "hello "
    yield "world"
```

Most LLM frameworks stream event objects, not plain strings. Adapt them by extracting text deltas:

```python
async def text_stream_from_events(events):
    async for event in events:
        text = extract_text(event)
        if text:
            yield text
```

Then guard the text stream:

```python
guard = StreamGuard.from_file("block_words.txt")

async for event in guard.wrap(text_stream_from_events(events), on_block=cancel_upstream):
    ...
```

## Framework Adapters

This package intentionally does not depend on framework-specific event classes. Keep adapters in your application.

### Generic Object Events

If your framework emits objects with a `content` field:

```python
async def object_event_text_stream(events):
    async for event in events:
        text = getattr(event, "content", None)
        if isinstance(text, str) and text:
            yield text
```

### Dict Events

If your framework emits dictionaries:

```python
async def dict_event_text_stream(events):
    async for event in events:
        text = event.get("delta") or event.get("content")
        if isinstance(text, str) and text:
            yield text
```

### Agno

Agno's `agent.arun(..., stream=True)` yields streaming events. Extract the `content` field:

```python
async def agno_text_stream(agent, prompt):
    async for event in agent.arun(prompt, stream=True):
        text = getattr(event, "content", None)
        if isinstance(text, str) and text:
            yield text
```

Then:

```python
async for event in guard.wrap(agno_text_stream(agent, prompt), on_block=cancel_agent_stream):
    ...
```

### OpenAI-Compatible Chat Completions

For OpenAI-compatible SSE/chat-completion streams, extract the provider's text delta and yield strings:

```python
async def openai_compatible_text_stream(response_stream):
    async for chunk in response_stream:
        text = chunk.choices[0].delta.content
        if text:
            yield text
```

The guard layer stays the same:

```python
async for event in guard.wrap(openai_compatible_text_stream(response_stream)):
    ...
```

## Cancelling Upstream

`on_block` is called when a blocked word is found.

```python
async def cancel_model():
    await response_stream.aclose()


async for event in guard.wrap(model_text_stream(), on_block=cancel_model):
    ...
```

If your framework exposes a task:

```python
task = asyncio.create_task(run_model())


async def cancel_model():
    task.cancel()
```

## Events

### DeltaEvent

Safe text that can be sent to the client.

```python
event.type == "delta"
event.text
```

### BlockedEvent

A blocked word was detected.

```python
event.type == "blocked"
event.word
event.start
event.end
```

### DoneEvent

The stream finished cleanly.

```python
event.type == "done"
```

## Word Lists

Load one file:

```python
guard = StreamGuard.from_file("block_words.txt")
```

Load multiple files:

```python
guard = StreamGuard.from_files([
    "policy/base.txt",
    "policy/custom.txt",
])
```

Load a directory of `.txt` files:

```python
guard = StreamGuard.from_directory("/path/to/Vocabulary", min_word_length=2)
```

`min_word_length` is useful for large third-party lexicons that may contain one-character words.

## Normalization

Use `drop_separators=True` to match text with inserted spaces, punctuation, or different casing:

```python
guard = StreamGuard.from_file(
    "block_words.txt",
    drop_separators=True,
)
```

Example:

```text
word list: violetcomet
streamed text: VIOLET-comet
```

This will be blocked.

## Recommended Service Pattern

Create `StreamGuard` once at service startup:

```python
guard = StreamGuard.from_file("block_words.txt", drop_separators=True)
```

Create a guarded stream per request:

```python
async def guarded_llm_stream(model_text_stream):
    async for event in guard.wrap(model_text_stream):
        if isinstance(event, DeltaEvent):
            yield {"type": "delta", "text": event.text}
        elif isinstance(event, BlockedEvent):
            yield {
                "type": "blocked",
                "word": event.word,
                "start": event.start,
                "end": event.end,
            }
            break
        else:
            yield {"type": "done"}
```

If you update the word list, create a new `StreamGuard`.

## Local Development

```bash
uv run pytest python_tests
uv run python examples/basic_usage.py
uv run python scripts/test_wheel_package.py
```

Build:

```bash
uv build
```

The wheel install test creates a temporary environment, installs the built wheel, and runs usage tests against the installed package:

```bash
uv run python scripts/test_wheel_package.py
```

## Notes

- Only model output is checked. User input should be checked separately if needed.
- The package does not include semantic moderation.
- The package does not include your production word list.
- The package does not manage HTTP, SSE, WebSocket, or model provider clients.
- Keep provider/framework adapters in your application code.
