# RoomKit

> Minimum version: 0.7.0 | Python 3.12+

> RoomKit is a pure async Python library for building multi-channel conversation
> systems. It provides room-based abstractions for managing conversations across
> SMS, Email, Voice, WebSocket, AI, and other channels with pluggable storage,
> identity resolution, hooks, and realtime events.
> Pydantic 2.x, fully typed, zero required dependencies beyond Pydantic.

RoomKit is a pure async Python library for building multi-channel conversation systems. It provides a **room-based abstraction** where conversations happen in rooms, participants communicate through channels, and hooks let you intercept and modify the flow.

## Key Concepts

- **Room** — A container for a conversation. Holds participants, channel bindings, and an ordered event timeline.
- **Channel** — A communication endpoint (SMS, Email, Voice, AI, WebSocket, etc.). Registered once, attached to many rooms.
- **Hook** — A function that intercepts events at specific points in the pipeline. Can block, modify, or observe messages.
- **Event Router** — Broadcasts events to all channels attached to a room, with content transcoding per channel capabilities.
- **Identity Pipeline** — Maps external sender IDs to known participants with challenge/response flows.
- **Realtime Events** — Ephemeral events (typing, presence, reactions) that are not stored in history.

## Architecture at a Glance

RoomKit uses a hub-and-spoke model. The `RoomKit` orchestrator sits at the center. Channels connect on the edges. Messages flow inbound through a defined pipeline, get stored, then broadcast outbound to all attached channels.

```
Inbound Message
  -> InboundRoomRouter.route()       # Find target room
  -> Channel.handle_inbound()        # Parse -> RoomEvent
  -> IdentityResolver.resolve()      # Identify sender
  -> BEFORE_BROADCAST hooks          # Can block/modify
  -> Store event
  -> EventRouter.broadcast()         # Deliver to all channels
    -> Content transcoding           # Adapt per channel capabilities
    -> Rate limiting + retry
  -> AFTER_BROADCAST hooks           # Async side effects
```

## What You Can Build

- **AI-powered support agents** across SMS, WhatsApp, and web chat
- **Voice assistants** with real-time STT/TTS and interruption handling
- **Multi-agent pipelines** where specialized agents hand off conversations
- **Notification systems** that bridge channels (SMS + Email + push)
- **Speech-to-speech AI** with Gemini Live, OpenAI Realtime, or Grok

## Design Principles

- **Async-first** — All I/O is async. No synchronous blocking.
- **Pluggable everything** — Storage, identity, routing, AI providers, voice backends — all swappable via ABCs with in-memory defaults.
- **Zero required deps** — Only `pydantic>=2.9`. Everything else is optional extras.
- **Type-safe** — Strict mypy, full type hints, Pydantic models throughout.
- **Python 3.12+** — Uses modern syntax (`X | None`, not `Optional[X]`).
---

## Basic Install

```bash
pip install roomkit
# or with uv
uv add roomkit
```

The core library has a single dependency: `pydantic>=2.9`.

## Optional Extras

RoomKit uses optional extras for provider-specific dependencies. Install only what you need:

### AI Providers

```bash
pip install roomkit[anthropic]    # Anthropic (Claude)
pip install roomkit[openai]       # OpenAI (GPT)
pip install roomkit[gemini]       # Google Gemini
pip install roomkit[mistral]      # Mistral AI
pip install roomkit[vllm]         # vLLM local inference (uses openai SDK)
pip install roomkit[azure]        # Azure OpenAI
```

### Voice — Speech-to-Text

```bash
pip install roomkit[deepgram]     # Deepgram STT (cloud)
pip install roomkit[sherpa-onnx]  # SherpaOnnx STT (local, offline)
pip install roomkit[gradium]      # Gradium STT
pip install roomkit[qwen-asr]     # Qwen3 ASR
```

### Voice — Text-to-Speech

```bash
pip install roomkit[elevenlabs]   # ElevenLabs TTS (cloud)
pip install roomkit[sherpa-onnx]  # SherpaOnnx TTS (local, offline)
pip install roomkit[gradium]      # Gradium TTS
pip install roomkit[qwen-tts]     # Qwen3 TTS
pip install roomkit[neutts]       # NeuTTS
```

### Voice — Backends

```bash
pip install roomkit[local-audio]  # Local mic/speaker (sounddevice + numpy)
pip install roomkit[fastrtc]      # FastRTC WebRTC backend
pip install roomkit[rtp]          # RTP backend
pip install roomkit[sip]          # SIP backend
pip install roomkit[webtransport] # WebTransport backend
```

### Voice — Pipeline

```bash
pip install roomkit[webrtc-aec]   # WebRTC echo cancellation
pip install roomkit[aicoustics]   # ai|coustics denoiser
pip install roomkit[smart-turn]   # ML-based turn detection
```

### Realtime Voice (Speech-to-Speech)

```bash
pip install roomkit[realtime-openai]   # OpenAI Realtime API
pip install roomkit[realtime-gemini]   # Google Gemini Live API
```

### Messaging Providers

```bash
pip install roomkit[twilio]            # Twilio SMS/RCS
pip install roomkit[telegram]          # Telegram Bot API
pip install roomkit[teams]             # Microsoft Teams (Bot Framework)
pip install roomkit[whatsapp-personal] # WhatsApp Personal (neonize)
pip install roomkit[websocket]         # WebSocket source
pip install roomkit[sse]               # Server-Sent Events source
```

### Storage & Infrastructure

```bash
pip install roomkit[postgres]          # PostgreSQL persistence (asyncpg)
pip install roomkit[mcp]               # Model Context Protocol tools
pip install roomkit[opentelemetry]     # OpenTelemetry tracing
```

### Meta Extras

```bash
pip install roomkit[providers]  # All AI + transport providers
pip install roomkit[sources]    # All event-driven sources
pip install roomkit[dev]        # Development (test + lint + type check)
```

## Environment Variables

Provider-specific API keys are passed via configuration objects, not environment variables. Example:

```python
from roomkit.providers.anthropic.config import AnthropicConfig

config = AnthropicConfig(api_key="sk-ant-...")
```

For voice providers, use lazy loaders to avoid import-time dependency checks:

```python
from roomkit.voice import get_deepgram_provider, get_deepgram_config

DeepgramSTTProvider = get_deepgram_provider()
DeepgramConfig = get_deepgram_config()

stt = DeepgramSTTProvider(DeepgramConfig(api_key="..."))
```

## Development Setup

```bash
git clone https://github.com/roomkit-live/roomkit
cd roomkit
uv sync --extra dev    # Install all dev dependencies
make all               # Run lint + typecheck + security + tests
```
---

A complete working example: two WebSocket users chatting with an AI assistant in a moderated room.

```python
from __future__ import annotations

import asyncio

from roomkit import (
    ChannelCategory,
    HookResult,
    HookTrigger,
    InboundMessage,
    RoomContext,
    RoomEvent,
    RoomKit,
    TextContent,
    WebSocketChannel,
)
from roomkit.channels.ai import AIChannel
from roomkit.providers.ai.mock import MockAIProvider


async def main() -> None:
    # 1. Create the framework instance
    kit = RoomKit()

    # 2. Create channels
    ws_alice = WebSocketChannel("ws-alice")
    ws_bob = WebSocketChannel("ws-bob")
    ai = AIChannel("ai-assistant", provider=MockAIProvider(responses=["Got it!"]))

    # 3. Register channels with the framework
    kit.register_channel(ws_alice)
    kit.register_channel(ws_bob)
    kit.register_channel(ai)

    # 4. Wire up receive callbacks (in production, WebSocket sends to clients)
    alice_inbox: list[RoomEvent] = []
    bob_inbox: list[RoomEvent] = []

    async def alice_recv(_conn: str, event: RoomEvent) -> None:
        alice_inbox.append(event)

    async def bob_recv(_conn: str, event: RoomEvent) -> None:
        bob_inbox.append(event)

    ws_alice.register_connection("alice-conn", alice_recv)
    ws_bob.register_connection("bob-conn", bob_recv)

    # 5. Create a room and attach channels
    await kit.create_room(room_id="demo-room")
    await kit.attach_channel("demo-room", "ws-alice")
    await kit.attach_channel("demo-room", "ws-bob")
    await kit.attach_channel(
        "demo-room", "ai-assistant", category=ChannelCategory.INTELLIGENCE
    )

    # 6. Add a BEFORE_BROADCAST hook for content moderation
    @kit.hook(HookTrigger.BEFORE_BROADCAST, name="profanity_filter")
    async def profanity_filter(event: RoomEvent, ctx: RoomContext) -> HookResult:
        if isinstance(event.content, TextContent) and "badword" in event.content.body:
            return HookResult.block("Message blocked by profanity filter")
        return HookResult.allow()

    # 7. Send messages through the inbound pipeline
    result = await kit.process_inbound(
        InboundMessage(
            channel_id="ws-alice",
            sender_id="alice",
            content=TextContent(body="Hello everyone!"),
        )
    )
    print(f"Alice sent 'Hello everyone!' -> blocked={result.blocked}")

    # Both Bob and the AI receive Alice's message.
    # The AI responds with "Got it!" which is broadcast to Alice and Bob.

    # 8. Query stored conversation history
    events = await kit.store.list_events("demo-room")
    for ev in events:
        print(f"  [{ev.source.channel_id}] {ev.content.body}")


if __name__ == "__main__":
    asyncio.run(main())
```

## What Just Happened

1. **RoomKit()** created the framework with in-memory defaults (store, locks, realtime).
2. **Channels** were registered globally, then **attached** to a room.
3. AI channel was attached with `category=ChannelCategory.INTELLIGENCE` — it receives messages and generates responses.
4. A **BEFORE_BROADCAST** hook runs synchronously before every broadcast. It can `block()`, `allow()`, or `modify()` events.
5. **process_inbound()** ran the full pipeline: route -> parse -> identity -> hooks -> store -> broadcast.
6. The AI's response was automatically routed back through the same pipeline (with chain depth tracking to prevent loops).

## Core Pattern

Every RoomKit application follows this pattern:

```python
from roomkit import RoomKit

# 1. Create framework
kit = RoomKit()

# 2. Register channels
kit.register_channel(channel)

# 3. Create rooms and attach channels
await kit.create_room(room_id="my-room")
await kit.attach_channel("my-room", "channel-id")

# 4. Process inbound messages
await kit.process_inbound(InboundMessage(...))
```

## Next Steps

- Add hooks for moderation, logging, or analytics (see Hooks)
- Configure AI providers for real LLM responses (see AI Channels)
- Add voice with STT/TTS (see Voice Channels)
- Set up multi-agent orchestration (see Orchestration)
- Deploy with PostgreSQL storage (see Storage)
---

## Hub-and-Spoke Model

RoomKit uses a hub-and-spoke architecture. The `RoomKit` class is the central hub. Channels are the spokes. Messages flow in through channels, get processed by the hub, then broadcast out to all channels attached to the room.

```
                    +------------------+
                    |     RoomKit      |
                    |  (orchestrator)  |
                    +--------+---------+
                             |
         +-------------------+-------------------+
         |         |         |         |         |
     +---+---+ +---+---+ +--+---+ +---+---+ +---+---+
     |  SMS  | | Email | | Voice| |  AI   | |  WS   |
     +-------+ +-------+ +------+ +-------+ +-------+
```

## Inbound Pipeline

Every inbound message follows an immutable pipeline order:

```
1. InboundRoomRouter.route()        # Find target room by channel binding
2. Channel.handle_inbound()         # Parse external format -> RoomEvent
3. IdentityResolver.resolve()       # Map sender_id -> participant
4. Identity hooks                   # ON_IDENTITY_AMBIGUOUS / ON_IDENTITY_UNKNOWN
5. Room lock acquired               # Per-room atomic processing
6. Idempotency check                # Deduplicate by provider_message_id
7. BEFORE_BROADCAST hooks           # Sync: can block or modify the event
8. Store event + update counters    # Persist to ConversationStore
9. EventRouter.broadcast()          # Deliver to all attached channels
   -> Content transcoding           # Adapt content per channel capabilities
   -> Rate limiting                 # TokenBucketRateLimiter per binding
   -> Retry with backoff            # RetryPolicy per binding
10. AFTER_BROADCAST hooks           # Async: fire-and-forget side effects
11. Room lock released
```

This order is defined in the RFC (Section 10.1) and must not be reordered.

## Channel Categories

Channels have two categories:

- **TRANSPORT** — Push messages to users (SMS, Email, WebSocket, Voice). Default category.
- **INTELLIGENCE** — Generate responses (AI, agents). Receives broadcasts, responds through the inbound pipeline.

## Channel-to-AI Message Flow (Reentry Loop)

When you send a message from a transport channel, it flows to the AI and back automatically. Here's the complete flow:

```
1. Transport channel (SMS/WS/Voice) → kit.process_inbound(InboundMessage)
2. Inbound pipeline: route → parse → identity → hooks → store
3. EventRouter.broadcast() → delivers event to ALL attached channels
   ├── Transport channels: note delivery (no response)
   └── AI channel: calls LLM provider → generates response
       └── Returns ChannelOutput(response_events=[RoomEvent])
4. REENTRY LOOP: AI response re-enters as new inbound event
   ├── BEFORE_BROADCAST hooks run again (ConversationRouter stamps routing)
   ├── Event stored in timeline
   ├── Broadcast to all channels again
   │   ├── Transport channels: DELIVER the AI response to users
   │   └── Other AI channels: see the response (may generate follow-up)
   └── If follow-up AI responses exist → loop again (chain depth checked)
5. Chain depth limit (default max=5) → stops AI-to-AI infinite loops
6. AFTER_BROADCAST hooks fire (async side effects)
```

**Key insight**: The "pipeline" is a reentry loop — not a linear chain. AI responses go back through the same BEFORE_BROADCAST hooks, content transcoding, and broadcast cycle as user messages.

### Connecting Channel to AI — Minimal Example

```python
from roomkit import RoomKit, WebSocketChannel, ChannelCategory, InboundMessage, TextContent
from roomkit.channels.ai import AIChannel
from roomkit.providers.anthropic.ai import AnthropicAIProvider
from roomkit.providers.anthropic.config import AnthropicConfig

kit = RoomKit()

# Transport channel (user-facing)
ws = WebSocketChannel("ws-user")
ws.register_connection("conn-1", on_recv)
kit.register_channel(ws)

# Intelligence channel (AI)
ai = AIChannel("ai", provider=AnthropicAIProvider(AnthropicConfig(
    api_key="sk-ant-...", model="claude-sonnet-4-20250514",
)))
kit.register_channel(ai)

# Room wires them together
await kit.create_room(room_id="chat")
await kit.attach_channel("chat", "ws-user")  # TRANSPORT (default)
await kit.attach_channel("chat", "ai", category=ChannelCategory.INTELLIGENCE)

# User sends message → AI responds → response delivered to WebSocket
await kit.process_inbound(
    InboundMessage(channel_id="ws-user", sender_id="user", content=TextContent(body="Hi!"))
)
# on_recv callback fires with the AI's response
```

### Pipeline vs Pipeline

RoomKit uses "Pipeline" in two contexts — don't confuse them:

| Term | What It Is | Where |
|------|-----------|-------|
| **Inbound processing pipeline** | The 11-step message processing flow (route → parse → identity → hooks → store → broadcast) | `core/mixins/inbound_locked.py` |
| **Pipeline orchestration strategy** | A linear agent chain (triage → handler → resolver) for multi-agent handoffs | `from roomkit import Pipeline` |

The inbound pipeline processes every message. The Pipeline strategy controls which agent handles each turn.

## Pluggable Components

Every core component follows the ABC + default pattern:

| Component | ABC | Default | Purpose |
|-----------|-----|---------|---------|
| `ConversationStore` | `store/base.py` | `InMemoryStore` | Room, event, participant persistence |
| `RoomLockManager` | `core/locks.py` | `InMemoryLockManager` | Per-room atomic processing |
| `RealtimeBackend` | `realtime/base.py` | `InMemoryRealtime` | Ephemeral events (typing, presence) |
| `IdentityResolver` | `identity/base.py` | `None` (disabled) | Sender -> participant mapping |
| `InboundRoomRouter` | `core/inbound_router.py` | `DefaultInboundRoomRouter` | Route messages to rooms |

Replace any component at construction:

```python
from roomkit import RoomKit
from roomkit.store.postgres import PostgresStore

kit = RoomKit(store=PostgresStore("postgresql://..."))
```

## Room Lifecycle

```
ACTIVE -> PAUSED -> CLOSED -> ARCHIVED
           ^          |
           +----------+  (can close from paused)
```

- **ACTIVE** — Accepting messages, all channels active.
- **PAUSED** — Messages queued, channels paused. Auto-transition via `inactive_after_seconds`.
- **CLOSED** — No new messages. Auto-transition via `closed_after_seconds`.
- **ARCHIVED** — Terminal state, read-only.

## Event Model

Every message becomes a `RoomEvent` with:

- **content** — One of 11 content types (TextContent, RichContent, MediaContent, AudioContent, VideoContent, LocationContent, CompositeContent, TemplateContent, EditContent, DeleteContent, SystemContent)
- **source** — Who sent it (channel_id, participant_id, direction)
- **index** — Sequential, monotonically increasing per room
- **metadata** — Arbitrary key-value data

## Voice Architecture

The voice subsystem has three layers:

1. **VoiceBackend** — Pure audio transport (mic, SIP, RTP, WebRTC). No speech detection.
2. **AudioPipeline** — Processes audio frames: Resampler -> Recorder -> AEC -> AGC -> Denoiser -> VAD -> Diarization.
3. **VoiceChannel** — Wires backend -> pipeline -> STT/TTS, handles interruption and turn detection.

```
Inbound:   Backend -> [Resampler] -> [Recorder] -> [AEC] -> [AGC] -> [Denoiser] -> VAD -> [Diarization] + [DTMF]
Outbound:  TTS -> [PostProcessors] -> [Recorder] -> AEC.feed_reference -> [Resampler] -> Backend
```

## Realtime Voice Architecture

Speech-to-speech AI (Gemini Live, OpenAI Realtime) bypasses STT/TTS entirely:

- **RealtimeVoiceProvider** — Handles the AI model connection (WebSocket to Gemini/OpenAI).
- **RealtimeAudioTransport** — Handles browser/client audio (WebSocket or WebRTC).
- **RealtimeVoiceChannel** — Bridges transport <-> provider with tool calling support.
---

## Creating Rooms

```python
from roomkit import RoomKit

kit = RoomKit()

# Auto-generated ID
room = await kit.create_room()

# Explicit ID with metadata
room = await kit.create_room(
    room_id="support-123",
    metadata={"topic": "billing", "priority": "high"},
)
```

## Room Lifecycle

```python
from roomkit import RoomKit

kit = RoomKit()

# Create room with auto-pause and auto-close timers
room = await kit.create_room(room_id="session-1")

# Update metadata
await kit.update_room_metadata("session-1", {"status": "escalated"})

# Close a room
await kit.close_room("session-1")

# Check timers (call periodically, e.g. every 60s)
transitioned = await kit.check_all_timers()
```

## Channel Types

RoomKit ships with these channel types:

| Channel | Factory / Class | Category | Use Case |
|---------|----------------|----------|----------|
| SMS | `SMSChannel(id, provider=...)` | TRANSPORT | Text messages via Twilio, Telnyx, Sinch |
| RCS | `RCSChannel(id, provider=...)` | TRANSPORT | Rich messaging via Twilio, Telnyx |
| Email | `EmailChannel(id, provider=...)` | TRANSPORT | Email via ElasticEmail, SendGrid |
| WhatsApp | `WhatsAppChannel(id, provider=...)` | TRANSPORT | WhatsApp Business API |
| WhatsApp Personal | `WhatsAppPersonalChannel(id, provider=...)` | TRANSPORT | WhatsApp via neonize |
| Messenger | `MessengerChannel(id, provider=...)` | TRANSPORT | Facebook Messenger |
| Telegram | `TelegramChannel(id, provider=...)` | TRANSPORT | Telegram Bot API |
| Teams | `TeamsChannel(id, provider=...)` | TRANSPORT | Microsoft Teams Bot Framework |
| HTTP | `HTTPChannel(id, provider=...)` | TRANSPORT | Generic webhook |
| WebSocket | `WebSocketChannel(id)` | TRANSPORT | Real-time bidirectional |
| AI | `AIChannel(id, provider=...)` | INTELLIGENCE | LLM responses |
| Agent | `Agent(id, provider=...)` | INTELLIGENCE | Agent with tools + greeting |
| Voice | `VoiceChannel(id, stt=..., tts=..., backend=...)` | TRANSPORT | Real-time audio |
| Realtime Voice | `RealtimeVoiceChannel(id, provider=...)` | INTELLIGENCE | Speech-to-speech AI |

## Registering and Attaching Channels

Channels are registered globally, then attached to specific rooms:

```python
from roomkit import RoomKit, SMSChannel, ChannelCategory
from roomkit.channels.ai import AIChannel
from roomkit.providers.twilio.sms import TwilioSMSProvider
from roomkit.providers.twilio.config import TwilioConfig
from roomkit.providers.anthropic.ai import AnthropicAIProvider
from roomkit.providers.anthropic.config import AnthropicConfig

kit = RoomKit()

# Create and register channels
sms = SMSChannel("sms-main", provider=TwilioSMSProvider(TwilioConfig(
    account_sid="AC...",
    auth_token="...",
    from_number="+1234567890",
)))
ai = AIChannel("ai-agent", provider=AnthropicAIProvider(AnthropicConfig(
    api_key="sk-ant-...",
    model="claude-sonnet-4-20250514",
)))

kit.register_channel(sms)
kit.register_channel(ai)

# Create room and attach
await kit.create_room(room_id="support-room")
await kit.attach_channel("support-room", "sms-main")
await kit.attach_channel("support-room", "ai-agent", category=ChannelCategory.INTELLIGENCE)
```

## Channel Access Levels

Control what a channel can do in a room:

```python
from roomkit import Access

# Default: read and write
await kit.attach_channel("room", "channel", access=Access.READ_WRITE)

# Read only: receives messages but cannot send
await kit.attach_channel("room", "channel", access=Access.READ_ONLY)

# Write only: sends messages but doesn't receive broadcasts
await kit.attach_channel("room", "channel", access=Access.WRITE_ONLY)

# None: temporarily disabled
await kit.set_access("room", "channel", Access.NONE)
```

## Muting Channels

Muting suppresses a channel's output (AI responses) without detaching it. Side effects (tasks, observations) still fire.

```python
# Mute AI output
await kit.mute("room", "ai-agent")

# Unmute
await kit.unmute("room", "ai-agent")

# Mute only the output direction (AI won't respond, but still sees messages)
await kit.mute_output("room", "ai-agent")
await kit.unmute_output("room", "ai-agent")
```

## Per-Room Channel Configuration

Pass metadata when attaching to customize per-room behavior:

```python
await kit.attach_channel(
    "weather-room",
    "ai-agent",
    category=ChannelCategory.INTELLIGENCE,
    metadata={
        "system_prompt": "You are a weather assistant.",
        "temperature": 0.3,
        "tools": [
            {
                "name": "get_weather",
                "description": "Get current weather for a city",
                "parameters": {
                    "type": "object",
                    "properties": {
                        "city": {"type": "string"},
                    },
                    "required": ["city"],
                },
            },
        ],
    },
)
```

## Content Types

Events carry typed content. RoomKit supports 11 content types, all discriminated unions on the `type` field:

| Type | `type` Literal | Key Fields | Use Case |
|------|---------------|-----------|----------|
| `TextContent` | `"text"` | `body`, `language` | Plain text messages |
| `RichContent` | `"rich"` | `body`, `format` (html/markdown), `plain_text`, `buttons`, `cards`, `quick_replies` | Formatted messages with UI elements |
| `MediaContent` | `"media"` | `url`, `mime_type`, `filename`, `size_bytes`, `caption` | Images, documents, files |
| `AudioContent` | `"audio"` | `url`, `mime_type`, `duration_seconds`, `transcript` | Voice messages |
| `VideoContent` | `"video"` | `url`, `mime_type`, `duration_seconds`, `thumbnail_url` | Video messages |
| `LocationContent` | `"location"` | `latitude`, `longitude`, `label`, `address` | Geographic coordinates |
| `CompositeContent` | `"composite"` | `parts` (list of EventContent, max depth 5) | Multi-part messages |
| `TemplateContent` | `"template"` | `template_id`, `language`, `parameters`, `body` | WhatsApp Business / RCS templates |
| `SystemContent` | `"system"` | `body`, `code`, `data` (dict) | System-generated events |
| `EditContent` | `"edit"` | `target_event_id`, `new_content`, `edit_source` | Message edits |
| `DeleteContent` | `"delete"` | `target_event_id`, `delete_type`, `reason` | Message deletion |

```python
from roomkit.models.event import (
    TextContent,
    RichContent,
    MediaContent,
    AudioContent,
    VideoContent,
    LocationContent,
    CompositeContent,
    TemplateContent,
    SystemContent,
    EditContent,
    DeleteContent,
)

# Plain text
TextContent(body="Hello!")

# Rich content with buttons and Markdown
RichContent(
    body="Choose an option:",
    format="markdown",
    plain_text="Choose an option:",
    buttons=[{"text": "Option A", "payload": "a"}, {"text": "Option B", "payload": "b"}],
)

# Media attachment
MediaContent(url="https://example.com/image.png", mime_type="image/png", caption="Product photo")

# Audio with transcript
AudioContent(url="https://example.com/voice.ogg", mime_type="audio/ogg", transcript="Hello there")

# Location
LocationContent(latitude=40.7128, longitude=-74.0060, label="NYC Office", address="New York, NY")

# Template (WhatsApp Business)
TemplateContent(template_id="order_confirmation", language="en", parameters={"1": "ORD-123"})

# Multi-part message (max depth 5)
CompositeContent(parts=[
    TextContent(body="Here's the photo:"),
    MediaContent(url="https://example.com/photo.jpg", mime_type="image/jpeg"),
])

# Edit a previous message
EditContent(target_event_id="evt-abc", new_content=TextContent(body="Corrected text"))

# Delete a message
DeleteContent(target_event_id="evt-abc", delete_type="sender", reason="User retracted")
```

## Content Transcoding

When broadcasting events, the EventRouter automatically **transcodes** content to match each target channel's capabilities. A channel that only supports TEXT will receive a text fallback of a RichContent message.

### How Transcoding Works

1. Event broadcast starts → EventRouter iterates over all target channel bindings
2. For each target, call `ContentTranscoder.transcode(content, source_binding, target_binding)`
3. Transcoder checks `target_binding.capabilities.media_types` to decide if content passes through or needs conversion
4. If transcode returns `None` → delivery is skipped for that channel (content cannot be represented)

### Channel Capabilities

Each channel binding declares what content types it supports via `ChannelCapabilities`:

```python
from roomkit.models.channel import ChannelCapabilities
from roomkit.models.enums import ChannelMediaType

# SMS channel: text only, 160 char limit
sms_caps = ChannelCapabilities(
    media_types=[ChannelMediaType.TEXT],
    max_length=160,
)

# WebSocket channel: rich content, media, audio, video
ws_caps = ChannelCapabilities(
    media_types=[
        ChannelMediaType.TEXT,
        ChannelMediaType.RICH,
        ChannelMediaType.MEDIA,
        ChannelMediaType.AUDIO,
        ChannelMediaType.VIDEO,
        ChannelMediaType.LOCATION,
    ],
    supports_edit=True,
    supports_delete=True,
    supports_reactions=True,
    supports_typing=True,
)
```

`ChannelMediaType` enum values: `TEXT`, `RICH`, `MEDIA`, `AUDIO`, `VIDEO`, `LOCATION`, `TEMPLATE`.

### Default Fallback Chain

The `DefaultContentTranscoder` applies these fallback rules:

| Content Type | If Target Supports It | Fallback |
|-------------|----------------------|----------|
| `TextContent` | Always passes through | — |
| `RichContent` | RICH in media_types → pass | `TextContent(plain_text or body)` |
| `MediaContent` | MEDIA in media_types → pass | `TextContent("[Media: {caption or filename or url}]")` |
| `AudioContent` | AUDIO in media_types → pass | `TextContent(transcript)` or `"[Voice message: {url}]"` |
| `VideoContent` | VIDEO in media_types → pass | `TextContent("[Video: {url}]")` |
| `LocationContent` | LOCATION in media_types → pass | `TextContent("[Location: {label} ({lat}, {lon})]")` |
| `CompositeContent` | Recursive transcode of all parts | If all parts become text → flatten to single TextContent |
| `TemplateContent` | TEMPLATE in media_types → pass | `TextContent(body or "[Template: {id}]")` |
| `EditContent` | `supports_edit` → pass | Transcode `new_content` + prefix "Correction:" |
| `DeleteContent` | `supports_delete` → pass | `TextContent("[Message deleted]")` |

### Custom Content Transcoder

Implement the `ContentTranscoder` ABC to customize how content is adapted:

```python
from roomkit.core.router import ContentTranscoder
from roomkit.models.channel import ChannelBinding
from roomkit.models.event import EventContent, TextContent, MediaContent
from roomkit.models.enums import ChannelMediaType

class MyTranscoder(ContentTranscoder):
    async def transcode(
        self,
        content: EventContent,
        source_binding: ChannelBinding,
        target_binding: ChannelBinding,
    ) -> EventContent | None:
        target_types = target_binding.capabilities.media_types

        # Custom: convert images to descriptive text for voice channels
        if isinstance(content, MediaContent) and content.mime_type.startswith("image/"):
            if ChannelMediaType.MEDIA not in target_types:
                caption = content.caption or "an image"
                return TextContent(body=f"[Image received: {caption}]")

        # Custom: truncate long text for SMS
        if isinstance(content, TextContent):
            max_len = target_binding.capabilities.max_length
            if max_len and len(content.body) > max_len:
                return TextContent(body=content.body[: max_len - 3] + "...")

        # Fall through to default behavior for other types
        return content

# Override the default transcoder on the RoomKit instance
kit = RoomKit()
kit._transcoder = MyTranscoder()
```

### Multichannel Transcoding Example

A single event gets adapted differently for each channel:

```python
# User sends a rich message with an image from WebSocket
content = CompositeContent(parts=[
    RichContent(body="**Check this out!**", format="markdown", plain_text="Check this out!"),
    MediaContent(url="https://example.com/photo.jpg", mime_type="image/jpeg", caption="Sunset"),
])

# WebSocket channel: receives CompositeContent as-is (supports RICH + MEDIA)
# SMS channel: receives TextContent("Check this out!\n[Media: Sunset]")
# Voice channel (TTS): receives TextContent("Check this out!\n[Media: Sunset]")
```

## Detaching Channels

```python
await kit.detach_channel("room", "channel-id")
```

## Binding Metadata Updates

```python
await kit.update_binding_metadata("room", "ai-agent", {"temperature": 0.9})
```

## Querying Rooms

```python
# Get a room
room = await kit.get_room("room-id")

# List bindings
bindings = await kit.store.list_bindings("room-id")

# List participants
participants = await kit.store.list_participants("room-id")

# Query event timeline
events = await kit.get_timeline("room-id", offset=0, limit=50)
```
---

Hooks intercept events at specific points in the processing pipeline. They can block messages, modify content, trigger side effects, or observe the conversation.

## Hook Basics

```python
from roomkit import RoomKit, HookTrigger, HookExecution, HookResult, RoomEvent, RoomContext

kit = RoomKit()

# Sync hook: runs BEFORE broadcast, can block or modify
@kit.hook(HookTrigger.BEFORE_BROADCAST)
async def content_filter(event: RoomEvent, ctx: RoomContext) -> HookResult:
    if "spam" in event.content.body.lower():
        return HookResult.block("Spam detected")
    return HookResult.allow()

# Async hook: runs AFTER broadcast, fire-and-forget
@kit.hook(HookTrigger.AFTER_BROADCAST, execution=HookExecution.ASYNC)
async def log_event(event: RoomEvent, ctx: RoomContext) -> None:
    await analytics.track("message", {"room": event.room_id})
```

## HookResult

Sync hooks (BEFORE_BROADCAST) must return a `HookResult`:

```python
from roomkit import HookResult, TextContent

# Allow the event to proceed
HookResult.allow()

# Block the event with a reason
HookResult.block("Contains prohibited content")

# Modify the event before broadcast
modified = event.model_copy(update={"content": TextContent(body="[REDACTED]")})
HookResult.modify(modified)
```

## Hook Priority

Lower priority numbers run first. Default is 0.

```python
# Runs first (priority=0)
@kit.hook(HookTrigger.BEFORE_BROADCAST, name="profanity_filter", priority=0)
async def profanity_filter(event: RoomEvent, ctx: RoomContext) -> HookResult:
    blocked_words = {"badword", "spam", "scam"}
    if isinstance(event.content, TextContent):
        words = set(event.content.body.lower().split())
        if words & blocked_words:
            return HookResult.block(f"Blocked: {words & blocked_words}")
    return HookResult.allow()

# Runs second (priority=1)
@kit.hook(HookTrigger.BEFORE_BROADCAST, name="pii_redactor", priority=1)
async def pii_redactor(event: RoomEvent, ctx: RoomContext) -> HookResult:
    import re
    if isinstance(event.content, TextContent):
        redacted = re.sub(
            r"\b\d{3}[-.]?\d{3}[-.]?\d{4}\b", "[REDACTED]", event.content.body
        )
        if redacted != event.content.body:
            modified = event.model_copy(update={"content": TextContent(body=redacted)})
            return HookResult.modify(modified)
    return HookResult.allow()
```

## Hook Filters

Filter hooks by channel type, channel ID, or direction:

```python
from roomkit.models.enums import ChannelType, ChannelDirection

@kit.hook(
    HookTrigger.AFTER_BROADCAST,
    execution=HookExecution.ASYNC,
    channel_types={ChannelType.SMS},
    directions={ChannelDirection.INBOUND},
    priority=10,
)
async def sms_audit(event: RoomEvent, ctx: RoomContext) -> None:
    await audit_log.record(event)
```

## Room-Scoped Hooks

Add hooks to specific rooms instead of globally:

```python
from roomkit import HookExecution

await kit.add_room_hook(
    room_id="vip-room",
    trigger=HookTrigger.BEFORE_BROADCAST,
    execution=HookExecution.SYNC,
    fn=my_hook_function,
    name="vip_filter",
)

# Remove later
await kit.remove_room_hook("vip-room", "vip_filter")
```

## Complete Hook Trigger Reference

### Event Pipeline

| Trigger | Execution | Signature | Description |
|---------|-----------|-----------|-------------|
| `BEFORE_BROADCAST` | SYNC | `(event, ctx) -> HookResult` | Before event is stored and broadcast. Can block/modify. |
| `AFTER_BROADCAST` | ASYNC | `(event, ctx) -> None` | After event is broadcast. Fire-and-forget side effects. |

### Channel Lifecycle

| Trigger | Execution | Signature | Description |
|---------|-----------|-----------|-------------|
| `ON_CHANNEL_ATTACHED` | ASYNC | `(event, ctx) -> None` | Channel was attached to a room |
| `ON_CHANNEL_DETACHED` | ASYNC | `(event, ctx) -> None` | Channel was detached from a room |
| `ON_CHANNEL_MUTED` | ASYNC | `(event, ctx) -> None` | Channel was muted in a room |
| `ON_CHANNEL_UNMUTED` | ASYNC | `(event, ctx) -> None` | Channel was unmuted in a room |

### Room Lifecycle

| Trigger | Execution | Signature | Description |
|---------|-----------|-----------|-------------|
| `ON_ROOM_CREATED` | ASYNC | `(event, ctx) -> None` | Room was created |
| `ON_ROOM_PAUSED` | ASYNC | `(event, ctx) -> None` | Room was paused |
| `ON_ROOM_CLOSED` | ASYNC | `(event, ctx) -> None` | Room was closed |

### Identity

| Trigger | Execution | Signature | Description |
|---------|-----------|-----------|-------------|
| `ON_IDENTITY_AMBIGUOUS` | SYNC | `(event, ctx) -> IdentityHookResult` | Multiple identity matches found |
| `ON_IDENTITY_UNKNOWN` | SYNC | `(event, ctx) -> IdentityHookResult` | No identity match found |
| `ON_PARTICIPANT_IDENTIFIED` | ASYNC | `(event, ctx) -> None` | Participant was successfully identified |

### Delivery

| Trigger | Execution | Signature | Description |
|---------|-----------|-----------|-------------|
| `ON_DELIVERY_STATUS` | ASYNC | `(status) -> None` | Delivery status update from provider |

### Side Effects

| Trigger | Execution | Signature | Description |
|---------|-----------|-----------|-------------|
| `ON_TASK_CREATED` | ASYNC | `(event, ctx) -> None` | AI extracted a task from conversation |
| `ON_ERROR` | ASYNC | `(event, ctx) -> None` | Error occurred during processing |

### Voice

| Trigger | Execution | Signature | Description |
|---------|-----------|-----------|-------------|
| `ON_SPEECH_START` | ASYNC | `(event, ctx) -> None` | VAD detected speech start |
| `ON_SPEECH_END` | ASYNC | `(event, ctx) -> None` | VAD detected speech end |
| `ON_TRANSCRIPTION` | ASYNC | `(event, ctx) -> None` | STT produced a transcription |
| `BEFORE_TTS` | SYNC | `(event, ctx) -> HookResult` | Before text is sent to TTS. Can block/modify. |
| `AFTER_TTS` | ASYNC | `(event, ctx) -> None` | After TTS audio is generated |

### Voice Pipeline

| Trigger | Execution | Signature | Description |
|---------|-----------|-----------|-------------|
| `ON_VAD_SILENCE` | ASYNC | `(event, ctx) -> None` | VAD detected silence |
| `ON_VAD_AUDIO_LEVEL` | ASYNC | `(event, ctx) -> None` | Audio level update from VAD |
| `ON_SPEAKER_CHANGE` | ASYNC | `(event, ctx) -> None` | Diarization detected speaker change |
| `ON_BARGE_IN` | ASYNC | `(event, ctx) -> None` | User interrupted TTS playback |
| `ON_TTS_CANCELLED` | ASYNC | `(event, ctx) -> None` | TTS playback was cancelled |
| `ON_DTMF` | ASYNC | `(event, ctx) -> None` | DTMF tone detected |
| `ON_TURN_COMPLETE` | ASYNC | `(event, ctx) -> None` | Turn detector says turn is complete |
| `ON_TURN_INCOMPLETE` | ASYNC | `(event, ctx) -> None` | Turn detector says turn is incomplete |
| `ON_BACKCHANNEL` | ASYNC | `(event, ctx) -> None` | Backchannel detected (uh-huh, yeah) |
| `ON_RECORDING_STARTED` | ASYNC | `(event, ctx) -> None` | Audio recording started |
| `ON_RECORDING_STOPPED` | ASYNC | `(event, ctx) -> None` | Audio recording stopped |
| `ON_INPUT_AUDIO_LEVEL` | ASYNC | `(event, ctx) -> None` | Input audio level update |
| `ON_OUTPUT_AUDIO_LEVEL` | ASYNC | `(event, ctx) -> None` | Output audio level update |

### Tool Execution

| Trigger | Execution | Signature | Description |
|---------|-----------|-----------|-------------|
| `ON_TOOL_CALL` | ASYNC | `(event, ctx) -> None` | AI invoked a tool (fires from AIChannel and RealtimeVoiceChannel) |

### Realtime Voice

| Trigger | Execution | Signature | Description |
|---------|-----------|-----------|-------------|
| `ON_REALTIME_TEXT_INJECTED` | ASYNC | `(event, ctx) -> None` | Text was injected into realtime session |

### Observability

| Trigger | Execution | Signature | Description |
|---------|-----------|-----------|-------------|
| `ON_FEEDBACK` | ASYNC | `(Observation, ctx) -> None` | User submitted quality feedback via `kit.submit_feedback()` |

## Framework Events

Framework events are lightweight lifecycle notifications (not message events):

```python
@kit.on("room_created")
async def on_room_created(event):
    print(f"Room created: {event.data['room_id']}")

@kit.on("voice_session_started")
async def on_voice(event):
    print(f"Voice session: {event.data['session_id']}")
```

Available framework event types: `room_created`, `room_closed`, `room_paused`, `room_channel_attached`, `room_channel_detached`, `channel_connected`, `channel_disconnected`, `voice_session_started`, `voice_session_ended`, `source_attached`, `source_detached`, `source_error`, `source_exhausted`.
---

AIChannel connects rooms to LLM providers. When a message is broadcast to an AI channel, it generates a response using conversation history and re-enters it through the inbound pipeline.

## Basic Setup

```python
from roomkit import RoomKit, ChannelCategory
from roomkit.channels.ai import AIChannel
from roomkit.providers.anthropic.ai import AnthropicAIProvider
from roomkit.providers.anthropic.config import AnthropicConfig

kit = RoomKit()

ai = AIChannel(
    "ai-assistant",
    provider=AnthropicAIProvider(AnthropicConfig(
        api_key="sk-ant-...",
        model="claude-sonnet-4-20250514",
    )),
    system_prompt="You are a helpful customer support agent.",
    temperature=0.7,
)
kit.register_channel(ai)

await kit.create_room(room_id="support")
await kit.attach_channel("support", "ai-assistant", category=ChannelCategory.INTELLIGENCE)
```

## AI Providers

| Provider | Class | Config | Extra |
|----------|-------|--------|-------|
| Anthropic (Claude) | `AnthropicAIProvider` | `AnthropicConfig` | `roomkit[anthropic]` |
| OpenAI (GPT) | `OpenAIAIProvider` | `OpenAIConfig` | `roomkit[openai]` |
| Google Gemini | `GeminiAIProvider` | `GeminiConfig` | `roomkit[gemini]` |
| Mistral | `MistralAIProvider` | `MistralConfig` | `roomkit[mistral]` |
| Azure OpenAI | `AzureAIProvider` | `AzureAIConfig` | `roomkit[azure]` |
| vLLM (local) | `create_vllm_provider()` | `VLLMConfig` | `roomkit[vllm]` |
| Mock (testing) | `MockAIProvider` | — | built-in |

```python
# OpenAI
from roomkit.providers.openai.ai import OpenAIAIProvider
from roomkit.providers.openai.config import OpenAIConfig

provider = OpenAIAIProvider(OpenAIConfig(api_key="sk-...", model="gpt-4o"))

# Gemini
from roomkit.providers.gemini.ai import GeminiAIProvider
from roomkit.providers.gemini.config import GeminiConfig

provider = GeminiAIProvider(GeminiConfig(api_key="...", model="gemini-2.0-flash"))

# Mock (for testing)
from roomkit.providers.ai.mock import MockAIProvider

provider = MockAIProvider(responses=["Hello!", "How can I help?"])
```

## Agent Class

`Agent` extends `AIChannel` with role, description, greeting, and memory support — designed for multi-agent orchestration:

```python
from roomkit import Agent
from roomkit.providers.ai.mock import MockAIProvider

agent = Agent(
    "support-agent",
    provider=MockAIProvider(responses=["I can help with that."]),
    role="Customer support specialist",
    description="Handles billing and account questions",
    system_prompt="You are a support specialist. Be concise and helpful.",
    greeting="Hi! How can I help you today?",
)
```

## Tool Calling

Define tools as JSON schema and attach them to the AI channel:

```python
from roomkit import ChannelCategory
from roomkit.channels.ai import AIChannel
from roomkit.providers.openai.ai import OpenAIAIProvider
from roomkit.providers.openai.config import OpenAIConfig

ai = AIChannel(
    "ai-assistant",
    provider=OpenAIAIProvider(OpenAIConfig(api_key="sk-...", model="gpt-4o")),
    system_prompt="You help users check the weather.",
    tools=[
        {
            "name": "get_weather",
            "description": "Get current weather for a city",
            "parameters": {
                "type": "object",
                "properties": {
                    "city": {"type": "string", "description": "City name"},
                    "units": {"type": "string", "enum": ["celsius", "fahrenheit"]},
                },
                "required": ["city"],
            },
        },
    ],
)
```

### Tool Handler

Register a handler to execute tool calls via the constructor:

```python
async def handle_tools(name: str, arguments: dict) -> str:
    if name == "get_weather":
        city = arguments["city"]
        return f'{{"temperature": 22, "condition": "sunny", "city": "{city}"}}'
    return '{"error": "Unknown tool"}'

ai = AIChannel(
    "ai-assistant",
    provider=provider,
    tools=[...],
    tool_handler=handle_tools,
)
```

### Tool Protocol (Tool ABC)

For structured tool definitions, use the `Tool` base class:

```python
from roomkit.tools.base import Tool

class GetWeather(Tool):
    name = "get_weather"
    description = "Get current weather for a city"
    parameters = {
        "type": "object",
        "properties": {
            "city": {"type": "string"},
        },
        "required": ["city"],
    }

    async def execute(self, arguments: dict) -> str:
        return '{"temperature": 22, "condition": "sunny"}'

ai = AIChannel("ai", provider=provider, tools=[GetWeather()])
```

### MCP Tool Provider

Integrate Model Context Protocol servers:

```python
from roomkit.tools.mcp import MCPToolProvider

mcp = MCPToolProvider(server_command=["uvx", "mcp-server-sqlite", "--db", "data.db"])
await mcp.initialize()

ai = AIChannel("ai", provider=provider, tools=mcp.tools())
```

## Per-Room Configuration

Override AI settings per room via binding metadata:

```python
await kit.attach_channel(
    "billing-room",
    "ai-agent",
    category=ChannelCategory.INTELLIGENCE,
    metadata={
        "system_prompt": "You are a billing specialist.",
        "temperature": 0.3,
        "tools": [...],
    },
)
```

## Streaming

AIChannel supports streaming responses to WebSocket clients:

```python
from roomkit import WebSocketChannel

ws = WebSocketChannel("ws-user")

# Register with stream support
ws.register_connection("conn-1", on_recv, stream_send_fn=on_stream)

async def on_stream(conn_id: str, msg) -> None:
    # StreamStart, StreamChunk, StreamEnd
    print(f"Stream: {msg}")
```

## AI Thinking/Reasoning

Some providers support extended thinking:

```python
ai = AIChannel(
    "ai",
    provider=AnthropicAIProvider(AnthropicConfig(
        api_key="...",
        model="claude-sonnet-4-20250514",
    )),
    system_prompt="Think step by step.",
    thinking_budget=4096,  # Setting a budget enables thinking mode
)
```

## Vision Support

AI providers that support vision can process images sent as `MediaContent`:

```python
from roomkit.models.event import MediaContent

await kit.process_inbound(
    InboundMessage(
        channel_id="ws-user",
        sender_id="user",
        content=MediaContent(url="https://example.com/chart.png", mime_type="image/png"),
    )
)
# AI sees the image and responds with analysis
```

## Tool Call Events

AIChannel automatically broadcasts ephemeral `TOOL_CALL_START` and `TOOL_CALL_END` events when executing tools. Subscribe to these for UI indicators:

```python
await kit.subscribe_room("room-1", my_callback)

# Callback receives:
# TOOL_CALL_START: {tool_calls: [{id, name, arguments}], round, channel_id}
# TOOL_CALL_END: {tool_calls: [{id, name, result}], round, channel_id, duration_ms}
```
---

VoiceChannel handles real-time audio conversations with speech-to-text, text-to-speech, and an audio processing pipeline.

## Basic Voice Setup

```python
from roomkit import RoomKit, VoiceChannel
from roomkit.voice.pipeline import AudioPipelineConfig, MockVADProvider, VADEvent, VADEventType
from roomkit.voice.stt.mock import MockSTTProvider
from roomkit.voice.tts.mock import MockTTSProvider
from roomkit.voice.backends.mock import MockVoiceBackend

kit = RoomKit()

# Create providers
backend = MockVoiceBackend()
stt = MockSTTProvider(transcriptions=["Hello, how can I help?"])
tts = MockTTSProvider()
vad = MockVADProvider(events=[
    VADEvent(type=VADEventType.SPEECH_START),
    None,
    VADEvent(type=VADEventType.SPEECH_END, audio_bytes=b"audio"),
])

# Create voice channel with pipeline
voice = VoiceChannel(
    "voice-agent",
    stt=stt,
    tts=tts,
    backend=backend,
    pipeline=AudioPipelineConfig(vad=vad),
)
kit.register_channel(voice)
```

## Joining a Voice Session

```python
# Create room and attach voice channel
await kit.create_room(room_id="call-room")
await kit.attach_channel("call-room", "voice-agent")

# Join a participant to the voice session
session = await kit.join(
    room_id="call-room",
    channel_id="voice-agent",
    participant_id="caller-1",
)

# Leave when done
await kit.leave(session)
```

## STT Providers

| Provider | Class | Config | Extra |
|----------|-------|--------|-------|
| Deepgram | `DeepgramSTTProvider` | `DeepgramConfig` | `roomkit[deepgram]` |
| SherpaOnnx | `SherpaOnnxSTTProvider` | `SherpaOnnxSTTConfig` | `roomkit[sherpa-onnx]` |
| Gradium | `GradiumSTTProvider` | `GradiumSTTConfig` | `roomkit[gradium]` |
| Qwen3 ASR | `Qwen3ASRProvider` | `Qwen3ASRConfig` | `roomkit[qwen-asr]` |
| Mock | `MockSTTProvider` | — | built-in |

Use lazy loaders to avoid import-time dependency checks:

```python
from roomkit.voice import get_deepgram_provider, get_deepgram_config

DeepgramSTTProvider = get_deepgram_provider()
DeepgramConfig = get_deepgram_config()

stt = DeepgramSTTProvider(DeepgramConfig(
    api_key="...",
    model="nova-2",
    language="en",
))
```

## TTS Providers

| Provider | Class | Config | Extra |
|----------|-------|--------|-------|
| ElevenLabs | `ElevenLabsTTSProvider` | `ElevenLabsConfig` | `roomkit[elevenlabs]` |
| SherpaOnnx | `SherpaOnnxTTSProvider` | `SherpaOnnxTTSConfig` | `roomkit[sherpa-onnx]` |
| Gradium | `GradiumTTSProvider` | `GradiumTTSConfig` | `roomkit[gradium]` |
| Qwen3 | `Qwen3TTSProvider` | `Qwen3TTSConfig` | `roomkit[qwen-tts]` |
| NeuTTS | `NeuTTSProvider` | `NeuTTSConfig` | `roomkit[neutts]` |
| Grok TTS | `GrokTTSProvider` | `GrokTTSConfig` | xAI |
| Mock | `MockTTSProvider` | — | built-in |

```python
from roomkit.voice import get_elevenlabs_provider, get_elevenlabs_config

ElevenLabsTTSProvider = get_elevenlabs_provider()
ElevenLabsConfig = get_elevenlabs_config()

tts = ElevenLabsTTSProvider(ElevenLabsConfig(
    api_key="...",
    voice_id="21m00Tcm4TlvDq8ikWAM",
    model="eleven_turbo_v2",
))
```

## Voice Backends

Backends handle audio transport between the framework and participants:

| Backend | Class | Extra | Use Case |
|---------|-------|-------|----------|
| Local mic/speaker | `LocalAudioBackend` | `roomkit[local-audio]` | Development/testing |
| FastRTC (WebRTC) | `FastRTCVoiceBackend` | `roomkit[fastrtc]` | Browser-based voice |
| RTP | `RTPVoiceBackend` | `roomkit[rtp]` | VoIP integration |
| SIP | `SIPVoiceBackend` | `roomkit[sip]` | Telephony |
| WebTransport | `WebTransportBackend` | `roomkit[webtransport]` | Low-latency web |
| Mock | `MockVoiceBackend` | built-in | Testing |

```python
from roomkit.voice import get_local_audio_backend

LocalAudioBackend = get_local_audio_backend()
backend = LocalAudioBackend(sample_rate=16000, channels=1)
```

## Interruption Handling

Four strategies for handling user speech during TTS playback:

```python
from roomkit.voice.interruption import InterruptionConfig, InterruptionStrategy

voice = VoiceChannel(
    "voice",
    stt=stt,
    tts=tts,
    backend=backend,
    pipeline=AudioPipelineConfig(vad=vad),
    interruption=InterruptionConfig(
        strategy=InterruptionStrategy.CONFIRMED,
        min_speech_ms=300,  # Wait 300ms of sustained speech before interrupting
    ),
)
```

| Strategy | Behavior |
|----------|----------|
| `IMMEDIATE` | Interrupt on any detected speech |
| `CONFIRMED` | Wait for sustained speech (min_speech_ms). Default. |
| `SEMANTIC` | Use BackchannelDetector to ignore "uh-huh", "yeah" |
| `DISABLED` | Never interrupt TTS playback |

## Voice Greeting

Send a greeting when a session starts:

```python
await kit.send_greeting(
    room_id="call-room",
    channel_id="voice-agent",
    greeting="Welcome! How can I help you today?",
    session=session,
)
```

Or configure on the Agent:

```python
from roomkit import Agent

agent = Agent(
    "voice-agent",
    provider=provider,
    greeting="Welcome! How can I help you today?",
    stt=stt,
    tts=tts,
    backend=backend,
    pipeline=AudioPipelineConfig(vad=vad),
)
```

## Voice Hooks

```python
from roomkit import HookTrigger, HookExecution

@kit.hook(HookTrigger.ON_SPEECH_START, execution=HookExecution.ASYNC)
async def on_speech(event, ctx):
    print("User started speaking")

@kit.hook(HookTrigger.ON_TRANSCRIPTION, execution=HookExecution.ASYNC)
async def on_transcription(event, ctx):
    print(f"Transcription: {event.content.body}")

@kit.hook(HookTrigger.BEFORE_TTS)
async def before_tts(event, ctx):
    # Can modify or block TTS text
    return HookResult.allow()

@kit.hook(HookTrigger.ON_BARGE_IN, execution=HookExecution.ASYNC)
async def on_barge_in(event, ctx):
    print("User interrupted the AI")
```

## Audio Bridging

Bridge audio between sessions for human-to-human voice calls:

```python
voice = VoiceChannel("voice", backend=backend, bridge=True)

# With bridge + STT for live transcription
voice = VoiceChannel("voice", stt=stt, backend=backend, bridge=True)
```

Audio bridge supports N-party calls with mixing and cross-rate resampling.

## DTMF

Send and detect DTMF tones:

```python
# Send DTMF
await voice.send_dtmf(session, digit="1", duration_ms=160)

# Detect DTMF via hook
@kit.hook(HookTrigger.ON_DTMF, execution=HookExecution.ASYNC)
async def on_dtmf(event, ctx):
    print(f"DTMF digit: {event.data['digit']}")
```
---

The audio pipeline sits between the voice backend and STT/TTS, processing audio through pluggable stages.

## Pipeline Architecture

```
Inbound:   Backend -> [Resampler] -> [Recorder] -> [AEC] -> [AGC] -> [Denoiser] -> VAD -> [Diarization] + [DTMF]
Outbound:  TTS -> [PostProcessors] -> [Recorder] -> AEC.feed_reference -> [Resampler] -> Backend
```

All stages are optional except VAD (required for speech detection). Stages in brackets are skipped if not configured.

## Pipeline Configuration

```python
from roomkit import VoiceChannel
from roomkit.voice.pipeline import AudioPipelineConfig, VADConfig

pipeline = AudioPipelineConfig(
    resampler=my_resampler,           # Sample rate conversion
    vad=my_vad_provider,              # Voice activity detection (required)
    denoiser=my_denoiser,             # Background noise removal
    diarization=my_diarizer,          # Speaker identification
    aec=my_aec,                       # Echo cancellation
    agc=my_agc,                       # Automatic gain control
    dtmf=my_dtmf_detector,            # DTMF tone detection
    recorder=my_recorder,             # Audio recording
    recording_config=my_rec_config,   # Recording settings
    turn_detector=my_turn_detector,   # Turn-taking detection
    vad_config=VADConfig(
        silence_threshold_ms=500,     # Silence before speech_end
    ),
)

voice = VoiceChannel(
    "voice",
    stt=stt,
    tts=tts,
    backend=backend,
    pipeline=pipeline,
)
```

## Capability-Aware Skipping

AEC and AGC stages automatically skip when the backend declares native capabilities:

```python
from roomkit.voice.base import VoiceCapability

# If backend has NATIVE_AEC, pipeline skips the AEC stage
# If backend has NATIVE_AGC, pipeline skips the AGC stage
```

## Pipeline Stages Reference

### Resampler

Converts audio between sample rates (e.g., 8kHz SIP to 16kHz STT).

```python
from roomkit.voice.pipeline.resampler import LinearResamplerProvider

resampler = LinearResamplerProvider()
# Or use SincResampler for higher quality
```

### VAD (Voice Activity Detection)

Detects speech start/end events. Required for the pipeline.

```python
from roomkit.voice.pipeline import MockVADProvider, VADEvent, VADEventType, VADConfig

# Mock for testing
vad = MockVADProvider(events=[
    VADEvent(type=VADEventType.SPEECH_START),
    None,  # No event for this frame
    VADEvent(type=VADEventType.SPEECH_END, audio_bytes=b"speech-data"),
])

# Production: SherpaOnnx VAD (local, offline)
from roomkit.voice import get_sherpa_onnx_vad_provider, get_sherpa_onnx_vad_config

SherpaVAD = get_sherpa_onnx_vad_provider()
SherpaVADConfig = get_sherpa_onnx_vad_config()
vad = SherpaVAD(SherpaVADConfig(threshold=0.5))
```

### AEC (Acoustic Echo Cancellation)

Removes echo from the microphone signal caused by speaker output.

```python
# Speex AEC
from roomkit.voice import get_speex_aec_provider

SpeexAEC = get_speex_aec_provider()
aec = SpeexAEC(sample_rate=16000, frame_size=160, filter_length=1024)

# WebRTC AEC
# pip install roomkit[webrtc-aec]
```

The pipeline feeds TTS audio as reference to the AEC via `process_outbound()`.

### AGC (Automatic Gain Control)

Normalizes audio volume levels.

```python
from roomkit.voice.pipeline.agc import AGCConfig
from roomkit.voice.pipeline.agc.mock import MockAGCProvider

agc = MockAGCProvider()
```

### Denoiser

Removes background noise from audio.

```python
# RNNoise (local, CPU-based)
from roomkit.voice import get_rnnoise_denoiser_provider

RNNoise = get_rnnoise_denoiser_provider()
denoiser = RNNoise()

# ai|coustics Quail (cloud API)
# pip install roomkit[aicoustics]

# SherpaOnnx denoiser (local ONNX model)
from roomkit.voice import get_sherpa_onnx_denoiser_provider, get_sherpa_onnx_denoiser_config

SherpaDenoiser = get_sherpa_onnx_denoiser_provider()
SherpaDenoiserConfig = get_sherpa_onnx_denoiser_config()
denoiser = SherpaDenoiser(SherpaDenoiserConfig())
```

### Diarization

Identifies different speakers in multi-speaker audio.

```python
from roomkit.voice.pipeline.diarization.mock import MockDiarizationProvider

diarizer = MockDiarizationProvider()
```

### DTMF Detection

Detects dual-tone multi-frequency signals (phone keypad tones).

```python
from roomkit.voice.pipeline.dtmf.mock import MockDTMFDetector

dtmf = MockDTMFDetector()
```

DTMF runs in parallel with other pipeline stages (before AEC/AGC/denoiser).

### Audio Recorder

Records inbound and outbound audio.

```python
from roomkit.voice.pipeline.recorder.mock import MockAudioRecorder
from roomkit.voice.pipeline.recorder import RecordingConfig

recorder = MockAudioRecorder()
config = RecordingConfig(
    format="wav",
    sample_rate=16000,
    channels=1,
)
```

### Turn Detector

Determines when a speaker's turn is complete (post-STT):

```python
from roomkit.voice.pipeline.turn.mock import MockTurnDetector

turn = MockTurnDetector()

# Production: Smart turn detection (ML-based)
# pip install roomkit[smart-turn]
```

Turn detection accumulates transcription fragments until `is_complete=True`, then routes the combined text.

### Backchannel Detector

Classifies short utterances as backchannel (e.g., "uh-huh", "yeah") to prevent false interruptions:

```python
from roomkit.voice.pipeline.backchannel.mock import MockBackchannelDetector

bc = MockBackchannelDetector()
```

Used with `InterruptionStrategy.SEMANTIC`.

### Post-Processors

Custom audio transformations on outbound TTS audio:

```python
from roomkit.voice.pipeline.postprocessor.base import AudioPostProcessor
```

## Interruption Handling

The `InterruptionHandler` manages what happens when the user speaks during TTS playback:

```python
from roomkit.voice.interruption import InterruptionConfig, InterruptionStrategy

config = InterruptionConfig(
    strategy=InterruptionStrategy.CONFIRMED,
    min_speech_ms=300,
)
```

| Strategy | When to Use |
|----------|-------------|
| `IMMEDIATE` | Fast response, accept false positives |
| `CONFIRMED` | Balanced — waits for sustained speech |
| `SEMANTIC` | Ignore backchannel ("uh-huh") using BackchannelDetector |
| `DISABLED` | Never interrupt (e.g., announcements) |

## AudioFrame

Inbound audio is represented as `AudioFrame`:

```python
from roomkit.voice.audio_frame import AudioFrame

frame = AudioFrame(
    data=b"\x00" * 320,   # Raw PCM bytes
    sample_rate=16000,     # Hz
    channels=1,            # Mono
    sample_width=2,        # 16-bit PCM
    timestamp_ms=0.0,
)
```

Pipeline stages annotate `frame.metadata` as they process: `denoiser`, `vad`, `aec`, `agc`, `diarization`, `dtmf` keys.

## Mock Providers for Testing

Every pipeline stage has a mock provider with pre-configured event sequences:

```python
from roomkit.voice.pipeline import (
    MockVADProvider,
    MockDenoiserProvider,
    MockDiarizationProvider,
    MockAGCProvider,
    MockAECProvider,
    MockDTMFDetector,
    MockAudioRecorder,
    MockTurnDetector,
    MockBackchannelDetector,
)
```

Example with mock VAD:

```python
from roomkit.voice.pipeline import MockVADProvider, VADEvent, VADEventType

vad = MockVADProvider(events=[
    VADEvent(type=VADEventType.SPEECH_START),
    None,
    VADEvent(type=VADEventType.SPEECH_END, audio_bytes=b"speech"),
])
```
---

RealtimeVoiceChannel connects to speech-to-speech AI models that handle audio directly — no separate STT/TTS needed. The AI model receives audio and responds with audio.

## Basic Setup

```python
from roomkit import RoomKit, ChannelCategory
from roomkit.channels.realtime_voice import RealtimeVoiceChannel

kit = RoomKit()

# Gemini Live example
from roomkit.voice import get_gemini_live_provider

GeminiLiveProvider = get_gemini_live_provider()

realtime = RealtimeVoiceChannel(
    "realtime-voice",
    provider=GeminiLiveProvider(
        api_key="...",
        model="gemini-2.0-flash-live-001",
    ),
    system_prompt="You are a helpful voice assistant. Keep responses brief.",
)
kit.register_channel(realtime)

await kit.create_room(room_id="voice-room")
await kit.attach_channel("voice-room", "realtime-voice", category=ChannelCategory.INTELLIGENCE)
```

## Providers

| Provider | Class | Extra | Description |
|----------|-------|-------|-------------|
| Google Gemini Live | `GeminiLiveProvider` | `roomkit[realtime-gemini]` | Gemini 2.0 speech-to-speech |
| OpenAI Realtime | `OpenAIRealtimeProvider` | `roomkit[realtime-openai]` | GPT-4o realtime audio |
| xAI Grok | `XAIRealtimeProvider` | — | Grok speech-to-speech |
| Mock | `MockRealtimeProvider` | built-in | Testing |

```python
# OpenAI Realtime
from roomkit.voice import get_openai_realtime_provider

OpenAIRealtime = get_openai_realtime_provider()
provider = OpenAIRealtime(api_key="sk-...", model="gpt-4o-realtime-preview")

# xAI Grok
from roomkit.voice import get_xai_realtime_provider, get_xai_realtime_config

XAIRealtime = get_xai_realtime_provider()
XAIConfig = get_xai_realtime_config()
provider = XAIRealtime(XAIConfig(api_key="..."))
```

## Audio Transports

Transports handle the client-side audio (browser or device):

| Transport | Class | Extra | Use Case |
|-----------|-------|-------|----------|
| WebSocket | `WebSocketRealtimeTransport` | `roomkit[websocket]` | Browser via WebSocket |
| FastRTC (WebRTC) | `FastRTCRealtimeTransport` | `roomkit[fastrtc]` | Browser via WebRTC |
| Local mic/speaker | `LocalAudioBackend` | `roomkit[local-audio]` | Local development |
| Mock | `MockRealtimeTransport` | built-in | Testing |

```python
from roomkit.voice import get_websocket_realtime_transport

WSTransport = get_websocket_realtime_transport()
transport = WSTransport(host="0.0.0.0", port=8765)

realtime = RealtimeVoiceChannel(
    "realtime-voice",
    provider=provider,
    transport=transport,
    system_prompt="You are a helpful assistant.",
)
```

## Joining a Session

```python
session = await kit.join(
    room_id="voice-room",
    channel_id="realtime-voice",
    participant_id="caller-1",
)

# Leave when done
await kit.leave(session)
```

## Tool Calling

Realtime voice channels support tool calling during conversations:

```python
from roomkit.channels.realtime_voice import RealtimeVoiceChannel, ToolHandler

async def handle_tool(name: str, arguments: dict) -> str:
    if name == "get_weather":
        return '{"temperature": 22, "condition": "sunny"}'
    return '{"error": "unknown tool"}'

realtime = RealtimeVoiceChannel(
    "realtime-voice",
    provider=provider,
    system_prompt="You help with weather. Use the get_weather tool.",
    tools=[
        {
            "name": "get_weather",
            "description": "Get weather for a city",
            "parameters": {
                "type": "object",
                "properties": {"city": {"type": "string"}},
                "required": ["city"],
            },
        },
    ],
    tool_handler=handle_tool,
)
```

## Text Injection

Inject text into an active realtime session (appears as if the AI said it):

```python
session = await kit.join("room", "realtime-voice", participant_id="user")
await realtime.inject_text(session, "Let me check that for you...")
```

## VAD Configuration

Configure voice activity detection for realtime providers:

```python
realtime = RealtimeVoiceChannel(
    "realtime-voice",
    provider=provider,
    system_prompt="...",
    vad_config={
        "threshold": 0.5,
        "silence_duration_ms": 500,
    },
)
```

## Session Resumption

Some providers support session resumption after disconnection:

```python
# OpenAI Realtime supports session resumption
provider = OpenAIRealtime(
    api_key="sk-...",
    model="gpt-4o-realtime-preview",
)
# Sessions automatically resume when possible
```

## Hooks

```python
from roomkit import HookTrigger, HookExecution

@kit.hook(HookTrigger.ON_TOOL_CALL, execution=HookExecution.ASYNC)
async def on_tool(event, ctx):
    print(f"Realtime tool call: {event}")

@kit.hook(HookTrigger.ON_REALTIME_TEXT_INJECTED, execution=HookExecution.ASYNC)
async def on_inject(event, ctx):
    print(f"Text injected: {event}")
```
---

RoomKit provides four declarative orchestration strategies for multi-agent workflows. Pass a strategy to `RoomKit(orchestration=...)` or `create_room(orchestration=...)` — agents, routing, handoff tools, and conversation state are wired automatically.

## Strategies

### Pipeline

Linear agent chain: triage -> handler -> resolver. Each agent can only hand off to the next in sequence.

```python
from roomkit import Agent, Pipeline, RoomKit, WebSocketChannel
from roomkit.providers.ai.mock import MockAIProvider

triage = Agent(
    "triage",
    provider=MockAIProvider(responses=["Transferring you..."]),
    role="Triage agent",
    description="Routes requests to the right specialist",
    system_prompt="You triage incoming requests.",
)
handler = Agent(
    "handler",
    provider=MockAIProvider(responses=["Let me help with that."]),
    role="Request handler",
    description="Handles customer requests",
    system_prompt="You handle requests.",
)
resolver = Agent(
    "resolver",
    provider=MockAIProvider(responses=["All done!"]),
    role="Resolution specialist",
    description="Confirms resolution",
    system_prompt="You resolve and close requests.",
)

kit = RoomKit(orchestration=Pipeline(agents=[triage, handler, resolver]))
```

### Swarm

Every agent can hand off to every other agent. Bidirectional routing.

```python
from roomkit import Swarm

kit = RoomKit(orchestration=Swarm(agents=[billing, shipping, returns]))
```

### Supervisor

A supervisor agent delegates tasks to worker agents in child rooms:

```python
from roomkit import Supervisor

kit = RoomKit(orchestration=Supervisor(
    supervisor=manager_agent,
    workers=[researcher, writer, reviewer],
))
```

### Loop

Producer/reviewer cycle. The reviewer has an `approve_output` tool to break the loop.

```python
from roomkit import Loop

kit = RoomKit(orchestration=Loop(
    agent=writer_agent,
    reviewer=editor_agent,
    max_iterations=3,
))
```

## Using Orchestration

```python
from roomkit import InboundMessage, TextContent, WebSocketChannel

# Register transport channel
ws = WebSocketChannel("ws-user")
kit.register_channel(ws)

# Create room — orchestration auto-registers agents, creates router, sets initial state
await kit.create_room(room_id="support")
await kit.attach_channel("support", "ws-user")

# Messages are automatically routed to the active agent
await kit.process_inbound(
    InboundMessage(
        channel_id="ws-user",
        sender_id="user",
        content=TextContent(body="I need help with billing."),
    )
)
```

## Handoff Protocol

Agents hand off conversations by calling the `handoff_conversation` tool (auto-injected by orchestration strategies):

```python
# The AI calls this tool automatically:
# handoff_conversation(target="handler", reason="Billing issue", summary="User needs invoice help")
```

The handoff:
1. Updates `ConversationState` in room metadata (phase, active agent, handoff count)
2. Mutes the outgoing agent, unmutes the incoming agent
3. Emits a system event visible to the new agent
4. Records the transition in `phase_history`

## Conversation State

`ConversationState` tracks conversation progress within a room. It's stored in `Room.metadata["_conversation_state"]` and persists across all message turns. Orchestration strategies create and update it automatically, but you can also read and modify it directly.

### ConversationState Fields

| Field | Type | Default | Description |
|-------|------|---------|-------------|
| `phase` | `str` | `"intake"` | Current conversation phase. Can be any string — not restricted to built-in phases. |
| `active_agent_id` | `str \| None` | `None` | Channel ID of the currently active agent. |
| `previous_agent_id` | `str \| None` | `None` | Agent that was active before the last transition. |
| `handoff_count` | `int` | `0` | Total number of agent handoffs. |
| `phase_started_at` | `datetime` | now | Timestamp when the current phase started. |
| `phase_history` | `list[PhaseTransition]` | `[]` | Immutable audit trail of all transitions. |
| `context` | `dict[str, Any]` | `{}` | **Arbitrary user data** — store any custom key-value pairs across turns. |

Built-in phase constants (use any string — these are convenience defaults):
`ConversationPhase.INTAKE`, `QUALIFICATION`, `HANDLING`, `ESCALATION`, `RESOLUTION`, `FOLLOWUP`.

### Reading State

```python
from roomkit.orchestration.state import get_conversation_state

room = await kit.get_room("support")
state = get_conversation_state(room)

print(state.phase)            # "handling"
print(state.active_agent_id)  # "billing-agent"
print(state.handoff_count)    # 2
print(state.context)          # {"customer_tier": "premium", "issue_type": "refund"}

# Phase transition history (audit trail)
for t in state.phase_history:
    print(f"{t.from_phase} -> {t.to_phase} by {t.from_agent} -> {t.to_agent} ({t.reason})")
```

### Persisting Custom Data Across Turns

Use `state.context` to store arbitrary data that survives across conversation turns. Update state via `set_conversation_state()` + `store.update_room()`:

```python
from roomkit.orchestration.state import get_conversation_state, set_conversation_state

# Read current state
room = await kit.get_room("support")
state = get_conversation_state(room)

# Store custom data in context (immutable update pattern)
updated_state = state.model_copy(update={
    "context": {
        **state.context,
        "customer_tier": "premium",
        "issue_type": "refund",
        "attempts": state.context.get("attempts", 0) + 1,
    }
})

# Persist back to room
updated_room = set_conversation_state(room, updated_state)
await kit.store.update_room(updated_room)
```

### Retrieving Custom Data on Later Turns

```python
# In a hook or handler on a subsequent turn:
room = await kit.get_room("support")
state = get_conversation_state(room)

tier = state.context.get("customer_tier", "standard")
attempts = state.context.get("attempts", 0)
```

### Programmatic Phase Transitions

Use `state.transition()` to change phase and record an audit entry. It returns a new immutable state:

```python
from roomkit.orchestration.state import get_conversation_state, set_conversation_state

room = await kit.get_room("support")
state = get_conversation_state(room)

# Transition to a new phase with reason and agent
new_state = state.transition(
    to_phase="escalation",
    to_agent="supervisor-agent",
    reason="Customer requested manager",
    metadata={"escalation_priority": "high"},
)
# new_state.phase == "escalation"
# new_state.active_agent_id == "supervisor-agent"
# new_state.previous_agent_id == <old agent>
# new_state.handoff_count incremented if agent changed

# Persist
updated_room = set_conversation_state(room, new_state)
await kit.store.update_room(updated_room)
```

### Using State in Hooks

A common pattern — persist data via a BEFORE_BROADCAST hook:

```python
from roomkit import HookTrigger, HookResult, RoomEvent, RoomContext, TextContent
from roomkit.orchestration.state import get_conversation_state, set_conversation_state

@kit.hook(HookTrigger.BEFORE_BROADCAST)
async def track_sentiment(event: RoomEvent, ctx: RoomContext) -> HookResult:
    if not isinstance(event.content, TextContent):
        return HookResult.allow()

    state = get_conversation_state(ctx.room)

    # Update context with per-turn data
    updated_state = state.model_copy(update={
        "context": {
            **state.context,
            "last_message_length": len(event.content.body),
            "message_count": state.context.get("message_count", 0) + 1,
        }
    })
    updated_room = set_conversation_state(ctx.room, updated_state)
    await kit.store.update_room(updated_room)

    return HookResult.allow()
```

### PhaseTransition Audit Record

Each transition creates an immutable `PhaseTransition`:

| Field | Type | Description |
|-------|------|-------------|
| `from_phase` | `str` | Previous phase |
| `to_phase` | `str` | New phase |
| `from_agent` | `str \| None` | Previous agent channel ID |
| `to_agent` | `str \| None` | New agent channel ID |
| `reason` | `str` | Why the transition occurred |
| `timestamp` | `datetime` | When the transition occurred |
| `metadata` | `dict[str, Any]` | Arbitrary metadata for this transition |

## Conversation Router (Advanced)

`ConversationRouter` dynamically routes incoming messages to different agents based on conversation state, message content, origin channel, or custom logic. It works as a BEFORE_BROADCAST hook that stamps routing metadata on events.

### RoutingConditions Reference

All conditions are ANDed — every non-None field must match for the rule to fire:

| Field | Type | Description |
|-------|------|-------------|
| `phases` | `set[str] \| None` | Match when `state.phase` is in this set |
| `channel_types` | `set[ChannelType] \| None` | Match when sender's channel type is in this set |
| `intents` | `set[str] \| None` | Match when `event.metadata["intent"]` is in this set |
| `source_channel_ids` | `set[str] \| None` | Match when sender's channel ID is in this set |
| `custom` | `Callable \| None` | Custom function `(event, context, state) -> bool` for arbitrary logic |

### Routing by Phase

```python
from roomkit.orchestration.router import ConversationRouter, RoutingRule, RoutingConditions

router = ConversationRouter(
    rules=[
        RoutingRule(
            agent_id="billing-agent",
            conditions=RoutingConditions(phases={"billing"}),
            priority=0,
        ),
        RoutingRule(
            agent_id="shipping-agent",
            conditions=RoutingConditions(phases={"shipping"}),
            priority=0,
        ),
    ],
    default_agent_id="triage-agent",
)
```

### Routing by Channel Type (Origin)

Route messages to different agents based on where they came from:

```python
from roomkit.models.enums import ChannelType

router = ConversationRouter(
    rules=[
        RoutingRule(
            agent_id="voice-specialist",
            conditions=RoutingConditions(channel_types={ChannelType.VOICE}),
        ),
        RoutingRule(
            agent_id="sms-agent",
            conditions=RoutingConditions(channel_types={ChannelType.SMS, ChannelType.WHATSAPP}),
        ),
    ],
    default_agent_id="general-agent",
)
```

### Routing by Intent (Content-Based)

Set `event.metadata["intent"]` via a classification hook, then route by intent:

```python
@kit.hook(HookTrigger.BEFORE_BROADCAST, priority=-200)  # Run before router
async def classify_intent(event: RoomEvent, ctx: RoomContext) -> HookResult:
    if isinstance(event.content, TextContent):
        body = event.content.body.lower()
        intent = "billing" if "invoice" in body or "charge" in body else "general"
        modified = event.model_copy(update={"metadata": {**(event.metadata or {}), "intent": intent}})
        return HookResult.modify(modified)
    return HookResult.allow()

router = ConversationRouter(
    rules=[
        RoutingRule(
            agent_id="billing-agent",
            conditions=RoutingConditions(intents={"billing"}),
        ),
    ],
    default_agent_id="general-agent",
)
```

### Routing with Custom Logic

Use a custom callable for complex routing decisions, including content-based or state-based logic:

```python
def is_high_value_customer(event: RoomEvent, ctx: RoomContext, state: ConversationState) -> bool:
    return state.context.get("customer_tier") == "premium"

def contains_urgency(event: RoomEvent, ctx: RoomContext, state: ConversationState) -> bool:
    if isinstance(event.content, TextContent):
        return any(word in event.content.body.lower() for word in ["urgent", "emergency", "asap"])
    return False

router = ConversationRouter(
    rules=[
        # Premium customers with urgent messages -> senior agent
        RoutingRule(
            agent_id="senior-agent",
            conditions=RoutingConditions(custom=is_high_value_customer),
            priority=-1,  # Check first
        ),
        # Any urgent message -> escalation agent
        RoutingRule(
            agent_id="escalation-agent",
            conditions=RoutingConditions(custom=contains_urgency),
            priority=0,
        ),
    ],
    default_agent_id="general-agent",
    supervisor_id="supervisor-agent",  # Always observes all conversations
)
```

### Combined Conditions

Combine multiple conditions (all are ANDed):

```python
# Only route SMS messages during billing phase to the billing specialist
RoutingRule(
    agent_id="sms-billing-agent",
    conditions=RoutingConditions(
        phases={"billing"},
        channel_types={ChannelType.SMS},
    ),
    priority=0,
)
```

### Installing the Router

```python
# Option 1: Manual hook installation
kit.hook(
    HookTrigger.BEFORE_BROADCAST,
    execution=HookExecution.SYNC,
    priority=-100,
)(router.as_hook())

# Option 2: Use install() which also sets up handoff tools
handler = router.install(kit, agents=[billing_agent, shipping_agent, triage_agent])
```

### Routing Selection Priority

The router evaluates in this order:
1. **Agent affinity** — if `state.active_agent_id` is set and the agent is still attached, stick with it
2. **Rules** — evaluate `RoutingRule` list in ascending `priority` order; first match wins
3. **Fallback** — return `default_agent_id`
4. **Loop prevention** — events FROM intelligence channels are never routed (returns `None`)

## Conversation Pipeline (Advanced)

Define stages explicitly:

```python
from roomkit.orchestration.pipeline import ConversationPipeline, PipelineStage

pipeline = ConversationPipeline(stages=[
    PipelineStage(phase="triage", agent_id="triage", next="handling"),
    PipelineStage(phase="handling", agent_id="handler", next="resolution"),
    PipelineStage(phase="resolution", agent_id="resolver"),
])
```

## Status Bus

Agents can publish status updates for UI display:

```python
# Agents publish status via the status bus
# Subscribe to updates
async def on_status(update):
    print(f"Agent {update.agent_id}: {update.message} ({update.level})")

await kit.status_bus.subscribe(on_status)
```

## Delegation

Delegate tasks to background agents in child rooms:

```python
result = await kit.delegate(
    room_id="main-room",
    agent_id="researcher",
    task="Find the latest pricing for product X",
    context={"product": "X"},
)
```

## Memory Providers

Agents use memory providers to maintain conversation context across handoffs:

```python
from roomkit.memory.sliding_window import SlidingWindowMemory
from roomkit.orchestration.handoff import HandoffMemoryProvider

agent = Agent(
    "agent",
    provider=provider,
    memory=HandoffMemoryProvider(SlidingWindowMemory(max_events=50)),
)
```

`HandoffMemoryProvider` wraps any memory provider to inject handoff context (summary from the previous agent) into the conversation history.
---

Transport providers handle sending and receiving messages over external protocols. Each provider implements a channel-specific ABC.

## SMS

### Twilio

```python
from roomkit import RoomKit, SMSChannel
from roomkit.providers.twilio.sms import TwilioSMSProvider
from roomkit.providers.twilio.config import TwilioConfig

sms = SMSChannel("sms-twilio", provider=TwilioSMSProvider(TwilioConfig(
    account_sid="AC...",
    auth_token="...",
    from_number="+15551234567",
)))

kit = RoomKit()
kit.register_channel(sms)
```

### Telnyx

```python
from roomkit.providers.telnyx.sms import TelnyxSMSProvider
from roomkit.providers.telnyx.config import TelnyxConfig

sms = SMSChannel("sms-telnyx", provider=TelnyxSMSProvider(TelnyxConfig(
    api_key="KEY...",
    from_number="+15551234567",
)))
```

### Sinch

```python
from roomkit.providers.sinch.sms import SinchSMSProvider
from roomkit.providers.sinch.config import SinchConfig

sms = SMSChannel("sms-sinch", provider=SinchSMSProvider(SinchConfig(
    service_plan_id="...",
    api_token="...",
    from_number="+15551234567",
)))
```

### VoiceMeUp

```python
from roomkit.providers.voicemeup.sms import VoiceMeUpSMSProvider
from roomkit.providers.voicemeup.config import VoiceMeUpConfig

sms = SMSChannel("sms-vmu", provider=VoiceMeUpSMSProvider(VoiceMeUpConfig(
    username="...",
    auth_token="...",
    from_number="+15551234567",
)))
```

### Webhook Parsing

Each SMS provider has a webhook parser:

```python
from roomkit.providers.twilio.sms import parse_twilio_webhook
from roomkit.providers.telnyx.sms import parse_telnyx_webhook
from roomkit.providers.sinch.sms import parse_sinch_webhook
from roomkit.providers.voicemeup.sms import parse_voicemeup_webhook

# Or use the universal webhook parser
message = await kit.process_webhook(meta=request_data, channel_id="sms-twilio")
```

## RCS

```python
from roomkit import RCSChannel
from roomkit.providers.twilio.rcs import TwilioRCSProvider, TwilioRCSConfig

rcs = RCSChannel("rcs-main", provider=TwilioRCSProvider(TwilioRCSConfig(
    account_sid="AC...",
    auth_token="...",
    messaging_service_sid="MG...",  # Required for RCS (must be RCS-enabled)
)))
```

Also available via Telnyx: `TelnyxRCSProvider`, `TelnyxRCSConfig`.

## Email

### Elastic Email

```python
from roomkit import EmailChannel
from roomkit.providers.elasticemail.email import ElasticEmailProvider
from roomkit.providers.elasticemail.config import ElasticEmailConfig

email = EmailChannel("email-main", provider=ElasticEmailProvider(ElasticEmailConfig(
    api_key="...",
    from_email="support@example.com",
    from_name="Support Team",
)))
```

### SendGrid

```python
from roomkit.providers.sendgrid.email import SendGridEmailProvider
from roomkit.providers.sendgrid.config import SendGridConfig

email = EmailChannel("email-sg", provider=SendGridEmailProvider(SendGridConfig(
    api_key="SG...",
    from_email="support@example.com",
)))
```

## WhatsApp

### Business API (Cloud)

```python
from roomkit import WhatsAppChannel
from roomkit.providers.whatsapp.base import WhatsAppProvider

whatsapp = WhatsAppChannel("wa-business", provider=WhatsAppProvider(
    access_token="...",
    phone_number_id="...",
))
```

### Personal (neonize)

```python
from roomkit import WhatsAppPersonalChannel
from roomkit.providers.whatsapp.personal import WhatsAppPersonalProvider

whatsapp = WhatsAppPersonalChannel("wa-personal", provider=WhatsAppPersonalProvider())
```

Requires `pip install roomkit[whatsapp-personal]`. Uses the neonize library for multidevice protocol with typing indicators, read receipts, and media handling.

## Facebook Messenger

```python
from roomkit import MessengerChannel
from roomkit.providers.messenger.facebook import FacebookMessengerProvider
from roomkit.providers.messenger.config import MessengerConfig

messenger = MessengerChannel("messenger", provider=FacebookMessengerProvider(MessengerConfig(
    page_access_token="...",
    app_secret="...",
    verify_token="...",
)))
```

Webhook parser: `parse_messenger_webhook(request_data, channel_id="messenger")` — returns `list[InboundMessage]`.

## Telegram

```python
from roomkit import TelegramChannel
from roomkit.providers.telegram.bot import TelegramBotProvider
from roomkit.providers.telegram.config import TelegramConfig

telegram = TelegramChannel("telegram", provider=TelegramBotProvider(TelegramConfig(
    bot_token="123456:ABC-DEF...",
)))
```

Webhook parser: `parse_telegram_webhook(request_data, channel_id="telegram")` — returns `list[InboundMessage]`.

## Microsoft Teams

```python
from roomkit import TeamsChannel
from roomkit.providers.teams.bot_framework import BotFrameworkTeamsProvider
from roomkit.providers.teams.config import TeamsConfig

teams = TeamsChannel("teams", provider=BotFrameworkTeamsProvider(TeamsConfig(
    app_id="...",
    app_password="...",
)))
```

Features: proactive messaging, bot mention detection, reaction handling, conversation reference storage.

```python
from roomkit.providers.teams.webhook import parse_teams_webhook, is_bot_added

# Parse incoming Teams activity
activity = parse_teams_webhook(request_data)

# Check if bot was added to a conversation
if is_bot_added(activity):
    # Handle bot installation
    pass
```

## HTTP (Generic Webhook)

```python
from roomkit import HTTPChannel
from roomkit.providers.http.provider import WebhookHTTPProvider
from roomkit.providers.http.config import HTTPProviderConfig

http = HTTPChannel("webhook", provider=WebhookHTTPProvider(HTTPProviderConfig(
    url="https://api.example.com/messages",
    headers={"Authorization": "Bearer ..."},
)))
```

## WebSocket

WebSocket channels don't use a provider — they handle connections directly:

```python
from roomkit import WebSocketChannel

ws = WebSocketChannel("ws-client")

# Register a connection
ws.register_connection("conn-1", on_receive_callback)

# In production, connect to the framework
await kit.connect_websocket("ws-client", "conn-1", send_fn)
await kit.disconnect_websocket("ws-client", "conn-1")
```

## Phone Number Utilities

```python
from roomkit.providers.sms.phone import is_valid_phone, normalize_phone

is_valid_phone("+15551234567")   # True
normalize_phone("555-123-4567")  # "+15551234567"
```

## Delivery Status Tracking

Track delivery status for sent messages:

```python
from roomkit import DeliveryStatus

@kit.on_delivery_status
async def track_delivery(status: DeliveryStatus) -> None:
    if status.status == "failed":
        print(f"Message {status.message_id} failed: {status.error_message}")

# Process status webhooks from providers
await kit.process_delivery_status(status)
```
---

## Identity Resolution

The identity pipeline maps external sender IDs to known participants. It runs as part of the inbound pipeline, after `handle_inbound()` and before hooks.

### How It Works

```
Inbound message arrives with sender_id
  -> IdentityResolver.resolve(sender_id, channel_type)
  -> Returns IdentityResult with status:
     IDENTIFIED      -> participant_id stamped on event, processing continues
     AMBIGUOUS       -> ON_IDENTITY_AMBIGUOUS hook fires
     PENDING         -> ON_IDENTITY_AMBIGUOUS hook fires
     UNKNOWN         -> ON_IDENTITY_UNKNOWN hook fires
     REJECTED        -> ON_IDENTITY_UNKNOWN hook fires
```

### Identity Hooks

```python
from roomkit import RoomKit, HookTrigger
from roomkit.models.identity import IdentityHookResult, Identity

kit = RoomKit()

@kit.identity_hook(HookTrigger.ON_IDENTITY_UNKNOWN)
async def handle_unknown(event, ctx):
    # Option 1: Resolve to a known identity
    return IdentityHookResult.resolved(Identity(
        id="user-123",
        display_name="Alice",
    ))

    # Option 2: Challenge the sender to identify
    return IdentityHookResult.challenge(inject=InjectedEvent(
        content=TextContent(body="Please provide your account number."),
    ))

    # Option 3: Reject the message
    return IdentityHookResult.reject("Unknown sender")

    # Option 4: Keep as pending
    return IdentityHookResult.pending(candidates=[...])
```

### Custom Identity Resolver

```python
from roomkit.identity.base import IdentityResolver
from roomkit.models.identity import IdentityResult, Identity
from roomkit.models.enums import IdentificationStatus

class DatabaseIdentityResolver(IdentityResolver):
    async def resolve(self, message: InboundMessage, context: RoomContext) -> IdentityResult:
        user = await db.find_by_phone(message.sender_id)
        if user:
            return IdentityResult(
                status=IdentificationStatus.IDENTIFIED,
                identity=Identity(id=user.id, display_name=user.name),
            )
        return IdentityResult(status=IdentificationStatus.UNKNOWN)

kit = RoomKit(identity_resolver=DatabaseIdentityResolver())
```

### Manual Resolution

```python
# Resolve a pending participant to a known identity
await kit.resolve_participant(
    room_id="room-1",
    participant_id="pending-123",
    identity_id="user-456",
)
```

## Realtime Ephemeral Events

Ephemeral events (typing, presence, reactions) are not stored in conversation history. They're delivered in real-time to subscribers.

### Publishing Events

```python
from roomkit import RoomKit

kit = RoomKit()

# Typing indicator
await kit.publish_typing("room-1", "alice", is_typing=True)
await kit.publish_typing("room-1", "alice", is_typing=False)

# Presence
await kit.publish_presence("room-1", "alice", "online")   # online/away/offline

# Reaction
await kit.publish_reaction("room-1", "alice", target_event_id="evt-123", emoji="thumbsup")

# Read receipt
await kit.publish_read_receipt("room-1", "alice", event_id="evt-123")

# Tool call events (AIChannel publishes these automatically)
from roomkit.realtime.base import EphemeralEventType

await kit.publish_tool_call("room-1", "ai-agent", [
    {"id": "tc1", "name": "search", "arguments": {"q": "test"}}
], EphemeralEventType.TOOL_CALL_START)
```

### Subscribing to Events

```python
async def on_ephemeral(event):
    print(f"Ephemeral: {event.type} from {event.user_id}")

sub_id = await kit.subscribe_room("room-1", on_ephemeral)

# Unsubscribe later
await kit.unsubscribe_room(sub_id)
```

### Event Types

| Type | Description |
|------|-------------|
| `TYPING_START` | User started typing |
| `TYPING_STOP` | User stopped typing |
| `PRESENCE_ONLINE` | User came online |
| `PRESENCE_AWAY` | User went away |
| `PRESENCE_OFFLINE` | User went offline |
| `READ_RECEIPT` | User read a message |
| `REACTION` | User reacted to a message |
| `TOOL_CALL_START` | AI started executing a tool |
| `TOOL_CALL_END` | AI finished executing a tool |
| `CUSTOM` | Custom ephemeral event |

### Read Tracking

```python
# Mark a specific event as read
await kit.mark_read("room-1", "ws-user", "evt-123")

# Mark all events as read
await kit.mark_all_read("room-1", "ws-user")
```

### Custom Realtime Backend

The default `InMemoryRealtime` works for single-process deployments. For distributed systems, implement `RealtimeBackend`:

```python
from roomkit.realtime.base import RealtimeBackend

class RedisRealtimeBackend(RealtimeBackend):
    # Implement publish, subscribe, unsubscribe using Redis Pub/Sub
    ...

kit = RoomKit(realtime=RedisRealtimeBackend())
```
---

RoomKit provides built-in resilience patterns for production deployments: rate limiting, circuit breakers, retry with backoff, chain depth limits, and room lifecycle timers.

## Rate Limiting

Apply rate limits per channel binding:

```python
from roomkit import RoomKit
from roomkit.models.channel import RateLimit

kit = RoomKit()

await kit.attach_channel("room-1", "sms-out",
    rate_limit=RateLimit(max_per_second=1.0, max_per_minute=30.0),
)
```

Framework-level inbound rate limiting:

```python
from roomkit import RoomKit
from roomkit.models.channel import RateLimit

kit = RoomKit(inbound_rate_limit=RateLimit(max_per_second=10.0))
```

## Circuit Breaker

Isolate provider failures with circuit breakers:

```python
from roomkit.core.circuit_breaker import CircuitBreaker

cb = CircuitBreaker(failure_threshold=5, recovery_timeout=60.0)

if cb.allow_request():
    try:
        result = await provider.send(event, to="+1234567890")
        cb.record_success()
    except Exception:
        cb.record_failure()  # Opens after 5 consecutive failures
```

States: CLOSED (normal) -> OPEN (failing, fast-reject) -> HALF_OPEN (testing recovery).

## Retry with Backoff

Retry failed operations with exponential backoff:

```python
from roomkit.models.channel import RetryPolicy
from roomkit.core.retry import retry_with_backoff

policy = RetryPolicy(
    max_retries=3,
    base_delay_seconds=1.0,
    max_delay_seconds=60.0,
)

result = await retry_with_backoff(flaky_function, policy)
```

Apply per binding:

```python
await kit.attach_channel("room-1", "sms-out",
    retry_policy=RetryPolicy(max_retries=3, base_delay_seconds=1.0),
)
```

## Chain Depth Limit

Prevents infinite AI-to-AI loops. Default max depth is 5.

```python
kit = RoomKit(max_chain_depth=3)  # Stricter limit
```

When an AI response triggers another AI response, chain depth increments. Processing stops when the limit is reached.

## Room Lifecycle Timers

Auto-transition rooms based on inactivity:

```python
from roomkit import RoomKit

kit = RoomKit()

room = await kit.create_room(room_id="session-1")
# Configure timers on the room:
# inactive_after_seconds -> ACTIVE to PAUSED
# closed_after_seconds -> to CLOSED

# Check timers for one room
room = await kit.check_room_timers("session-1")

# Batch check all rooms (call periodically)
transitioned = await kit.check_all_timers()
```

## Delivery Status Tracking

Track whether messages were delivered to providers:

```python
from roomkit import DeliveryStatus

@kit.on_delivery_status
async def track(status: DeliveryStatus) -> None:
    if status.status == "failed":
        logger.error("Delivery failed: %s — %s", status.message_id, status.error_message)
    elif status.status == "delivered":
        logger.info("Delivered: %s", status.message_id)

# Process status webhooks from providers
await kit.process_delivery_status(status)
```

## Production Setup Example

```python
from roomkit import RoomKit
from roomkit.models.channel import RateLimit, RetryPolicy
from roomkit.store.postgres import PostgresStore

kit = RoomKit(
    store=PostgresStore("postgresql://user:pass@localhost/roomkit"),
    max_chain_depth=5,
    inbound_rate_limit=RateLimit(max_per_second=50.0),
    process_timeout=30.0,
)

# Per-channel resilience
await kit.attach_channel("room", "sms-out",
    rate_limit=RateLimit(max_per_second=1.0, max_per_minute=30.0),
    retry_policy=RetryPolicy(max_retries=3, base_delay_seconds=1.0),
)
```

## Framework Events for Monitoring

```python
@kit.on("source_error")
async def on_error(event):
    logger.error("Source error: %s", event.data["error"])

@kit.on("voice_session_ended")
async def on_voice_end(event):
    logger.info("Voice session ended: %s", event.data["session_id"])
```
---

RoomKit uses the `ConversationStore` ABC for persistence. The default `InMemoryStore` works out of the box. For production, use `PostgresStore`.

## InMemoryStore (Default)

```python
from roomkit import RoomKit

kit = RoomKit()  # Uses InMemoryStore automatically
```

Data lives in Python dicts — fast for development, lost on restart.

## PostgresStore

Install: `pip install roomkit[postgres]`

```python
from roomkit import RoomKit
from roomkit.store.postgres import PostgresStore

store = PostgresStore("postgresql://user:pass@localhost/roomkit")
await store.init()  # Creates tables if they don't exist

kit = RoomKit(store=store)
```

### Connection Pooling

```python
store = PostgresStore("postgresql://user:pass@localhost/roomkit")
await store.init(min_size=5, max_size=20)  # Pool sizing via init()
```

### Schema

PostgresStore creates 10 tables:

| Table | Purpose |
|-------|---------|
| `rooms` | Room records with status, metadata, timers |
| `events` | Event timeline with sequential indexing |
| `participants` | Room participants with roles and status |
| `bindings` | Channel-to-room bindings with config |
| `identities` | Known identity records |
| `tasks` | AI-extracted tasks |
| `observations` | AI-extracted observations |
| `delivery_status` | Message delivery tracking |
| `read_tracking` | Per-channel read positions |
| `telemetry_spans` | Telemetry span records |

### Operations

```python
# Room operations
room = await kit.create_room(room_id="persistent", metadata={"topic": "billing"})

# Event storage and retrieval
events = await kit.store.list_events("persistent", offset=0, limit=50)

# Timeline query with filters
timeline = await kit.get_timeline("persistent", offset=0, limit=50)

# Participant management
participants = await kit.store.list_participants("persistent")

# Binding management
bindings = await kit.store.list_bindings("persistent")
```

### Full Example

```python
from __future__ import annotations

import asyncio
import os

from roomkit import InboundMessage, RoomKit, TextContent, WebSocketChannel
from roomkit.store.postgres import PostgresStore


async def main() -> None:
    store = PostgresStore(os.environ["DATABASE_URL"])
    await store.init()

    kit = RoomKit(store=store)

    ws = WebSocketChannel("ws-user")
    kit.register_channel(ws)

    room = await kit.create_room(room_id="support", metadata={"topic": "billing"})
    await kit.attach_channel("support", "ws-user")

    await kit.process_inbound(
        InboundMessage(
            channel_id="ws-user",
            sender_id="user",
            content=TextContent(body="I need help with my invoice"),
        )
    )

    # Data persists across restarts
    events = await kit.store.list_events("support")
    print(f"Stored {len(events)} events")

    await kit.close()


if __name__ == "__main__":
    asyncio.run(main())
```

## Custom Store

Implement `ConversationStore` for other backends (Redis, DynamoDB, etc.):

```python
from roomkit.store.base import ConversationStore
from roomkit.models.room import Room

class RedisStore(ConversationStore):
    async def create_room(self, room: Room) -> Room:
        await self.redis.set(f"room:{room.id}", room.model_dump_json())
        return room

    # Implement all abstract methods...

kit = RoomKit(store=RedisStore())
```
---

RoomKit provides mock implementations for every pluggable component: AI providers, voice backends, pipeline stages, identity resolvers, and telemetry.

## Test Setup

```bash
pip install roomkit[dev]
uv run pytest              # Run all tests
uv run pytest tests/test_framework.py -v  # Specific file
```

Configuration in `pyproject.toml`:

```toml
[tool.pytest.ini_options]
testpaths = ["tests"]
asyncio_mode = "auto"  # No @pytest.mark.asyncio needed
```

## Basic Test Pattern

```python
from roomkit import RoomKit, InboundMessage, TextContent, WebSocketChannel


class TestMyFeature:
    async def test_message_delivery(self) -> None:
        kit = RoomKit()

        ws = WebSocketChannel("ws-user")
        kit.register_channel(ws)

        inbox: list = []
        async def on_recv(_conn: str, event) -> None:
            inbox.append(event)

        ws.register_connection("conn", on_recv)

        await kit.create_room(room_id="test-room")
        await kit.attach_channel("test-room", "ws-user")

        result = await kit.process_inbound(
            InboundMessage(
                channel_id="ws-user",
                sender_id="user",
                content=TextContent(body="Hello"),
            )
        )

        assert not result.blocked
        assert len(inbox) == 1
        assert inbox[0].content.body == "Hello"
```

## Mock AI Provider

```python
from roomkit.providers.ai.mock import MockAIProvider
from roomkit.channels.ai import AIChannel
from roomkit import ChannelCategory

# Responds with pre-configured messages in order
provider = MockAIProvider(responses=["Response 1", "Response 2"])

ai = AIChannel("ai", provider=provider)
kit.register_channel(ai)
await kit.attach_channel("room", "ai", category=ChannelCategory.INTELLIGENCE)

# After processing, check what the AI was called with:
assert len(provider.calls) == 1
last_call = provider.calls[-1]
print(last_call.system_prompt)
print(last_call.temperature)
print([t.name for t in last_call.tools])
```

## Mock Voice Backend

```python
from roomkit.voice.backends.mock import MockVoiceBackend

backend = MockVoiceBackend()

# Simulate audio input
backend.simulate_audio_received(session, audio_frame)
```

## Mock Pipeline Providers

Every pipeline stage has a mock that accepts pre-configured event sequences:

```python
from roomkit.voice.pipeline import (
    MockVADProvider,
    VADEvent,
    VADEventType,
    MockDenoiserProvider,
    MockDiarizationProvider,
    MockAGCProvider,
    MockAECProvider,
    MockDTMFDetector,
    MockAudioRecorder,
    MockTurnDetector,
    MockBackchannelDetector,
)

# VAD with event sequence
vad = MockVADProvider(events=[
    VADEvent(type=VADEventType.SPEECH_START),
    None,  # No event for this frame
    VADEvent(type=VADEventType.SPEECH_END, audio_bytes=b"speech"),
])

# Other mocks
denoiser = MockDenoiserProvider()
diarizer = MockDiarizationProvider()
agc = MockAGCProvider()
aec = MockAECProvider()
dtmf = MockDTMFDetector()
recorder = MockAudioRecorder()
turn = MockTurnDetector()
backchannel = MockBackchannelDetector()
```

## Mock STT/TTS

```python
from roomkit.voice.stt.mock import MockSTTProvider
from roomkit.voice.tts.mock import MockTTSProvider

stt = MockSTTProvider(transcriptions=["Hello", "How are you?"])
tts = MockTTSProvider()
```

## Mock Identity Resolver

```python
from roomkit.identity.mock import MockIdentityResolver

resolver = MockIdentityResolver()
kit = RoomKit(identity_resolver=resolver)
```

## Mock Realtime Provider

```python
from roomkit.voice.realtime.mock import MockRealtimeProvider, MockRealtimeTransport

provider = MockRealtimeProvider()
transport = MockRealtimeTransport()
```

## Testing Hooks

```python
from roomkit import HookTrigger, HookResult, HookExecution

async def test_hook_blocks_message() -> None:
    kit = RoomKit()
    ws = WebSocketChannel("ws")
    kit.register_channel(ws)

    await kit.create_room(room_id="r")
    await kit.attach_channel("r", "ws")

    @kit.hook(HookTrigger.BEFORE_BROADCAST)
    async def blocker(event, ctx):
        return HookResult.block("blocked")

    result = await kit.process_inbound(
        InboundMessage(
            channel_id="ws",
            sender_id="user",
            content=TextContent(body="test"),
        )
    )

    assert result.blocked
    assert result.reason == "blocked"
```

## Testing Voice Pipeline

```python
from roomkit import VoiceChannel
from roomkit.voice.pipeline import AudioPipelineConfig, MockVADProvider, VADEvent, VADEventType
from roomkit.voice.stt.mock import MockSTTProvider
from roomkit.voice.tts.mock import MockTTSProvider
from roomkit.voice.backends.mock import MockVoiceBackend
from roomkit.voice.audio_frame import AudioFrame

async def test_voice_pipeline() -> None:
    kit = RoomKit()

    backend = MockVoiceBackend()
    stt = MockSTTProvider(transcriptions=["Hello"])
    tts = MockTTSProvider()
    vad = MockVADProvider(events=[
        VADEvent(type=VADEventType.SPEECH_START),
        None,
        VADEvent(type=VADEventType.SPEECH_END, audio_bytes=b"audio"),
    ])

    voice = VoiceChannel(
        "voice", stt=stt, tts=tts, backend=backend,
        pipeline=AudioPipelineConfig(vad=vad),
    )
    kit.register_channel(voice)

    await kit.create_room(room_id="call")
    await kit.attach_channel("call", "voice")

    # Simulate audio input
    frame = AudioFrame(data=b"\x00" * 320, sample_rate=16000)
    backend.simulate_audio_received(None, frame)
```

## Testing Orchestration

```python
from roomkit import Agent, Pipeline, RoomKit, WebSocketChannel
from roomkit.providers.ai.mock import MockAIProvider

async def test_pipeline_orchestration() -> None:
    agent1 = Agent("a1", provider=MockAIProvider(responses=["Transferring..."]))
    agent2 = Agent("a2", provider=MockAIProvider(responses=["Resolved!"]))

    kit = RoomKit(orchestration=Pipeline(agents=[agent1, agent2]))

    ws = WebSocketChannel("ws")
    kit.register_channel(ws)

    await kit.create_room(room_id="test")
    await kit.attach_channel("test", "ws")

    # First message goes to agent1
    result = await kit.process_inbound(
        InboundMessage(channel_id="ws", sender_id="user", content=TextContent(body="Help"))
    )
    assert not result.blocked
```

## Test Utilities

```python
# Pydantic model updates (immutable)
modified = event.model_copy(update={"content": TextContent(body="new")})

# Never mutate models directly — always use model_copy
```
---

RoomKit exports **73 symbols** from `roomkit`. Providers and voice types import from subpackages.

## Top-Level Imports (`from roomkit import ...`)

### Framework

| Symbol | Description |
|--------|-------------|
| `RoomKit` | Central orchestrator — rooms, channels, hooks, storage |

### Channels

| Symbol | Description |
|--------|-------------|
| `Agent` | AI agent with role, description, greeting, tools |
| `AIChannel` | Intelligence layer for AI responses |
| `AudioVideoChannel` | Combined audio + video channel |
| `Channel` | Base class for all channels |
| `CLIChannel` | CLI/terminal interactive channel |
| `EmailChannel` | Email transport channel factory |
| `HTTPChannel` | HTTP webhook transport channel factory |
| `MessengerChannel` | Facebook Messenger transport channel factory |
| `RCSChannel` | RCS transport channel factory |
| `RealtimeAudioVideoChannel` | Realtime speech-to-speech with video |
| `RealtimeVoiceChannel` | Speech-to-speech AI channel |
| `SMSChannel` | SMS transport channel factory |
| `TeamsChannel` | Microsoft Teams transport channel factory |
| `TelegramChannel` | Telegram Bot transport channel factory |
| `TransportChannel` | Generic transport channel wrapper |
| `VideoChannel` | Video channel with vision pipeline |
| `VoiceChannel` | Real-time audio with STT/TTS/pipeline |
| `WebSocketChannel` | WebSocket bidirectional channel |
| `WhatsAppChannel` | WhatsApp Business API channel factory |
| `WhatsAppPersonalChannel` | WhatsApp Personal (neonize) channel factory |

### Enums

| Symbol | Description |
|--------|-------------|
| `Access` | Channel access levels: READ_WRITE, READ_ONLY, WRITE_ONLY, NONE |
| `ChannelCategory` | TRANSPORT or INTELLIGENCE |
| `ChannelType` | SMS, EMAIL, WHATSAPP, VOICE, AI, WEBSOCKET, etc. |
| `EventStatus` | DELIVERED, BLOCKED, etc. |
| `EventType` | MESSAGE, SYSTEM, EDIT, DELETE, etc. |
| `HookExecution` | SYNC or ASYNC |
| `HookTrigger` | 40+ hook triggers (BEFORE_BROADCAST, AFTER_BROADCAST, etc.) |
| `RoomStatus` | ACTIVE, PAUSED, CLOSED, ARCHIVED |

### Orchestration

| Symbol | Description |
|--------|-------------|
| `Loop` | Producer/reviewer cycle strategy |
| `Orchestration` | ABC for orchestration strategies |
| `Pipeline` | Linear agent chain strategy |
| `Supervisor` | Supervisor delegates to workers strategy |
| `Swarm` | Bidirectional handoff strategy |

### Models

| Symbol | Description |
|--------|-------------|
| `ChannelBinding` | Binding of a channel to a room |
| `ChannelCapabilities` | Declared capabilities of a channel |
| `ChannelOutput` | Output of a channel delivery |
| `DeliveryResult` | Result of delivering a message |
| `DeliveryStatus` | Delivery status from provider webhook |
| `EventSource` | Source attribution for an event |
| `FrameworkEvent` | Lightweight framework lifecycle event |
| `HookResult` | Result from sync hooks: `.allow()`, `.block(reason)`, `.modify(event)` |
| `InjectedEvent` | Event injected by a hook |
| `InboundMessage` | Incoming message from a provider |
| `InboundResult` | Result of processing an inbound message |
| `Participant` | Participant data model |
| `ProviderResult` | Result from a provider operation |
| `Room` | Room data model |
| `RoomContext` | Context passed to hooks (room, bindings, participants, events) |
| `RoomEvent` | Core event stored in the timeline |
| `RoomTimers` | Timer configuration for room inactivity |
| `SessionStartedEvent` | Event fired when a voice session starts |
| `TextContent` | Plain text content |
| `Tool` | Base class for tool definitions |
| `ToolCallCallback` | Callback type for tool call events |
| `ToolCallEvent` | Tool call event model |
| `ToolHandler` | Tool handler type for realtime voice |
| `get_current_voice_session` | Get the current voice session from context |

### Errors

| Symbol | Description |
|--------|-------------|
| `RoomKitError` | Base exception |
| `RoomNotFoundError` | Room does not exist |
| `ChannelNotFoundError` | Channel not attached to room |
| `ChannelNotRegisteredError` | Channel not registered with framework |
| `ParticipantNotFoundError` | Participant not found in room |
| `IdentityNotFoundError` | Identity not found |
| `SourceAlreadyAttachedError` | Source already attached |
| `SourceNotFoundError` | No source attached |
| `VoiceBackendNotConfiguredError` | Voice backend not configured |
| `VoiceNotConfiguredError` | Voice (STT/TTS) not configured |

### AI Documentation Helpers

| Symbol | Description |
|--------|-------------|
| `get_llms_txt()` | Get llms.txt content |
| `get_llms_full_txt()` | Get llms-full.txt content (comprehensive) |
| `get_agents_md()` | Get AGENTS.md content |
| `get_ai_context()` | Get combined AI context |

## RoomKit Constructor

```python
kit = RoomKit(
    store=InMemoryStore(),                 # ConversationStore implementation
    identity_resolver=None,                # IdentityResolver implementation
    identity_channel_types=None,           # Channel types to resolve identity for
    inbound_router=None,                   # InboundRoomRouter implementation
    lock_manager=None,                     # RoomLockManager implementation
    realtime=None,                         # RealtimeBackend implementation
    max_chain_depth=5,                     # AI-to-AI loop prevention
    identity_timeout=10.0,                 # Identity resolution timeout (seconds)
    process_timeout=30.0,                  # Inbound processing timeout (seconds)
    stt=None,                              # STTProvider for transcription
    tts=None,                              # TTSProvider for synthesis
    voice=None,                            # VoiceBackend for audio transport
    task_runner=None,                      # Background task runner
    delivery_strategy=None,                # Delivery strategy
    status_bus=None,                       # Status bus for orchestration
    telemetry=None,                        # TelemetryProvider
    inbound_rate_limit=None,               # Framework-level rate limit
    orchestration=None,                    # Orchestration strategy
)
```

## Key RoomKit Methods

### Room Lifecycle

| Method | Description |
|--------|-------------|
| `create_room(room_id?, metadata?, orchestration?)` | Create a room |
| `get_room(room_id)` | Get room by ID |
| `close_room(room_id)` | Close a room |
| `update_room_metadata(room_id, metadata)` | Update room metadata |
| `check_room_timers(room_id)` | Check timer transitions for one room |
| `check_all_timers()` | Check all room timers |

### Channel Operations

| Method | Description |
|--------|-------------|
| `register_channel(channel)` | Register a channel |
| `attach_channel(room_id, channel_id, category?, access?, ...)` | Attach channel to room |
| `detach_channel(room_id, channel_id)` | Detach channel from room |
| `mute(room_id, channel_id)` | Mute a channel |
| `unmute(room_id, channel_id)` | Unmute a channel |
| `set_access(room_id, channel_id, access)` | Set channel access level |

### Voice/Video

| Method | Description |
|--------|-------------|
| `join(room_id, channel_id, participant_id?, ...)` | Join voice/video session |
| `leave(session)` | Leave voice/video session |
| `transcribe(audio)` | Speech-to-text |
| `synthesize(text, voice?)` | Text-to-speech |

### Inbound Pipeline

| Method | Description |
|--------|-------------|
| `process_inbound(message, room_id?)` | Process an inbound message |

### Hooks

| Method | Description |
|--------|-------------|
| `hook(trigger, execution?, priority?, ...)` | Decorator to register a hook |
| `on(event_type)` | Decorator for framework events |
| `identity_hook(trigger, ...)` | Decorator for identity hooks |
| `on_delivery_status(fn)` | Decorator for delivery status |
| `add_room_hook(room_id, trigger, execution, fn, ...)` | Add room-scoped hook |
| `remove_room_hook(room_id, name)` | Remove room-scoped hook |

### Realtime

| Method | Description |
|--------|-------------|
| `publish_typing(room_id, user_id, is_typing?)` | Typing indicator |
| `publish_presence(room_id, user_id, status)` | Presence update |
| `publish_reaction(room_id, user_id, target_event_id, emoji)` | Reaction |
| `publish_read_receipt(room_id, user_id, event_id)` | Read receipt |
| `subscribe_room(room_id, callback)` | Subscribe to ephemeral events |
| `unsubscribe_room(subscription_id)` | Unsubscribe |

### Sources

| Method | Description |
|--------|-------------|
| `attach_source(channel_id, source, auto_restart?, ...)` | Attach event source |
| `detach_source(channel_id)` | Detach event source |
| `source_health(channel_id)` | Get source health |

### Other

| Method | Description |
|--------|-------------|
| `delegate(room_id, agent_id, task, ...)` | Delegate to background agent |
| `send_greeting(room_id, channel_id?, greeting?, ...)` | Send greeting |
| `send_event(room_id, channel_id, content, ...)` | Send event directly |
| `get_timeline(room_id, offset?, limit?)` | Query event timeline |
| `close()` | Shutdown framework |

## Provider Subpackage Imports

### AI Providers

```python
from roomkit.providers.anthropic.ai import AnthropicAIProvider
from roomkit.providers.anthropic.config import AnthropicConfig
from roomkit.providers.openai.ai import OpenAIAIProvider
from roomkit.providers.openai.config import OpenAIConfig
from roomkit.providers.gemini.ai import GeminiAIProvider
from roomkit.providers.gemini.config import GeminiConfig
from roomkit.providers.mistral.ai import MistralAIProvider
from roomkit.providers.mistral.config import MistralConfig
from roomkit.providers.ai.mock import MockAIProvider
from roomkit.providers.ai.base import AIProvider, AIContext, AIResponse, AITool, AIToolCall
```

### SMS Providers

```python
from roomkit.providers.twilio.sms import TwilioSMSProvider
from roomkit.providers.twilio.config import TwilioConfig
from roomkit.providers.telnyx.sms import TelnyxSMSProvider
from roomkit.providers.telnyx.config import TelnyxConfig
from roomkit.providers.sinch.sms import SinchSMSProvider
from roomkit.providers.sinch.config import SinchConfig
from roomkit.providers.sms.mock import MockSMSProvider
```

### Voice (Lazy Loaders)

```python
from roomkit.voice import (
    get_deepgram_provider, get_deepgram_config,
    get_elevenlabs_provider, get_elevenlabs_config,
    get_sherpa_onnx_stt_provider, get_sherpa_onnx_tts_provider,
    get_local_audio_backend,
    get_fastrtc_backend,
    get_rtp_backend,
    get_sip_backend,
    get_gemini_live_provider,
    get_openai_realtime_provider,
    get_xai_realtime_provider,
    get_websocket_realtime_transport,
    get_speex_aec_provider,
    get_rnnoise_denoiser_provider,
)
```

### Voice Mocks

```python
from roomkit.voice.backends.mock import MockVoiceBackend
from roomkit.voice.stt.mock import MockSTTProvider
from roomkit.voice.tts.mock import MockTTSProvider
from roomkit.voice.realtime.mock import MockRealtimeProvider, MockRealtimeTransport
```

### Pipeline

```python
from roomkit.voice.pipeline import (
    AudioPipelineConfig, VADConfig,
    MockVADProvider, VADEvent, VADEventType,
    MockDenoiserProvider, MockDiarizationProvider,
    MockAGCProvider, MockAECProvider, MockDTMFDetector,
    MockAudioRecorder, MockTurnDetector, MockBackchannelDetector,
)
from roomkit.voice.interruption import InterruptionConfig, InterruptionStrategy
from roomkit.voice.audio_frame import AudioFrame
```

### Orchestration

```python
from roomkit.orchestration.state import get_conversation_state, ConversationState
from roomkit.orchestration.router import ConversationRouter, RoutingRule
from roomkit.orchestration.pipeline import ConversationPipeline, PipelineStage
from roomkit.orchestration.handoff import HandoffHandler, HandoffMemoryProvider
```

### Storage

```python
from roomkit.store.base import ConversationStore
from roomkit.store.memory import InMemoryStore
from roomkit.store.postgres import PostgresStore
```

### Content Types

```python
from roomkit.models.event import (
    TextContent, RichContent, MediaContent, AudioContent, VideoContent,
    LocationContent, CompositeContent, TemplateContent, SystemContent,
    EditContent, DeleteContent,
)
```
