Metadata-Version: 2.1
Name: aiavatar
Version: 0.8.16
Summary: 🥰 Building AI-based conversational avatars lightning fast ⚡️💬
Home-page: https://github.com/uezo/aiavatar
Author: uezo
Author-email: uezo@uezo.net
Maintainer: uezo
Maintainer-email: uezo@uezo.net
License: Apache v2
Classifier: Programming Language :: Python :: 3
Description-Content-Type: text/markdown
License-File: LICENSE
Requires-Dist: httpx>=0.27.0
Requires-Dist: openai>=1.55.3
Requires-Dist: aiofiles>=24.1.0
Requires-Dist: numpy>=2.2.3
Requires-Dist: PyAudio>=0.2.14
Requires-Dist: silero-vad>=6.0.0

# AIAvatarKit

🥰 Building AI-based conversational avatars lightning fast ⚡️💬

![AIAvatarKit Architecture Overview](documents/images/aiavatarkit_overview.png) 

## ✨ Features

- **🌏 Live anywhere**: AIAvatarKit is a general-purpose Speech-to-Speech framework with multimodal input/output support. It can serve as the backend for a wide range of conversational AI systems.
    - Metaverse Platforms: Compatible with VRChat, cluster, Vket Cloud, and other platforms
    - Standalone Apps: Enables ultra-low latency real-time interaction via WebSocket or HTTP (SSE), with a unified interface that abstracts differences between LLMs
    - Channels and Devices: Supports edge devices like Raspberry Pi and telephony services like Twilio
- **🧩 Modular architecture**: Components such as VAD, STT, LLM, and TTS are modular and easy to integrate via lightweight interfaces. Supported modules include:
    - VAD: Built-in standard VAD (silence-based end-of-turn detection), SileroVAD
    - STT: Google, Azure, OpenAI, AmiVoice
    - LLM: ChatGPT, OpenAI Responses API (REST / WebSocket), Gemini, Claude, and any model supported by LiteLLM or Dify
    - TTS: VOICEVOX / AivisSpeech, OpenAI, SpeechGateway (including Style-Bert-VITS2 and Aivis Cloud API)
- **⚡️ AI Agent native**: Designed to support agentic systems. In addition to standard tool calls, it offers Dynamic Tool Calls for extensibility and supports progress feedback for high-latency operations.


## 🚀 Quick start

**Requirements**: Python 3.11+, OpenAI API key, and a running VOICEVOX instance for TTS

### 📺 Local (Console)

Install AIAvatarKit.

```sh
pip install aiavatar
```

**NOTE:** If the steps in technical blogs don’t work as expected, the blog may be based on a version prior to v0.6. Some features may be limited, but you can try downgrading with `pip install aiavatar==0.5.8` to match the environment described in the blog.


Make the script as `run.py`.

```python
import asyncio
from aiavatar import AIAvatar

aiavatar_app = AIAvatar(
    openai_api_key=OPENAI_API_KEY,
    debug=True
)
asyncio.run(aiavatar_app.start_listening())
```

Start AIAvatar. Also, don't forget to launch VOICEVOX beforehand.

```bash
$ python run.py
```

Conversation will start when you say the wake word "こんにちは" (or "Hello" when language is not `ja-JP`).

Feel free to enjoy the conversation afterwards!


### 🌐 WebSocket (Browser)

Install AIAvatarKit and additional dependencies.

```sh
pip install aiavatar fastapi uvicorn websockets
```

Make the script as `ws.py`.

```python
from fastapi import FastAPI
from fastapi.staticfiles import StaticFiles
from aiavatar.adapter.websocket.server import AIAvatarWebSocketServer
from aiavatar.util import download_example

# Download example UI if not exists
download_example("websocket/html")

# Build Speech-to-Speech pipeline with WebSocket adapter
aiavatar_app = AIAvatarWebSocketServer(
    openai_api_key=OPENAI_API_KEY
)

# Build websocket server
app = FastAPI()
router = aiavatar_app.get_websocket_router()
app.include_router(router)
app.mount("/static", StaticFiles(directory="html"), name="static")

# Setup admin panel (Optional)
from aiavatar.admin import setup_admin_panel
setup_admin_panel(app, adapter=aiavatar_app)
```

Start server. Also, don't forget to launch VOICEVOX beforehand.

```bash
$ python -m uvicorn ws:app
```

Open following URLs and enjoy the conversation!

- Character icon (dynamic expression, lip sync, blinking): http://127.0.0.1:8000/static/index.html
- MotionPNGTuber: http://127.0.0.1:8000/static/mpt.html

You can also access the Admin Panel at http://127.0.0.1:8000/admin.


## 🔖 Contents

- [🎓 Generative AI](#-generative-ai)
    - [ChatGPT](#chatgpt)
    - [OpenAI Responses API](#openai-responses-api)
    - [Claude](#claude)
    - [Gemini](#gemini)
    - [Dify](#dify)
    - [OpenAI-compatible APIs](#openai-compatible-apis)
    - [Other LLMs](#other-llms)

- [🗣️ Voice](#️voice)

- [👂 Speech Listener](#-speech-listener)
    - [Preprocessing and Postprocessing](#preprocessing-and-postprocessing)
    - [Speaker Diarization](#speaker-diarization)

- [🎙️ Speech Detector](#%EF%B8%8F-speech-detector)
    - [Silero VAD Speech Detector](#silero-speech-detector)
    - [Silero Stream Speech Detector](#silero-stream-speech-detector)
    - [Azure Stream Speech Detector](#azure-stream-speech-detector)
    - [AWS Stream Speech Detector](#aws-stream-speech-detector)
    - [Customization](#customization)
    - [Standard Speech Detector (Legacy)](#standard-speech-detector-legacy)

- [🥰 Face Expression](#-face-expression)

- [💃 Animation](#-animation)

- [🥳 Character Management](#-character-management)
    - [Get started](#get-started)
    - [Updating Diaries](#updating-diaries)
    - [Updating Schedules](#updating-schedules)
    - [Automated Daily Updates](#automated-daily-updates)
    - [Batch Generation](#batch-generation)
    - [Long-term Memory](#long-term-memory)
    - [Binding to Adapter](#binding-to-adapter)

- [🧩 API](#-api)
    - [💫 RESTful API (SSE)](#-restful-api-sse)
    - [🔵 Dify-compatible API](#-dify-compatible-api)
    - [🔌 WebSocket](#-websocket)
    - [🟩 LINE Bot](#-line-bot)

- [🦜 AI Agent](#-ai-agent)
    - [⚡️ Tool Call](#️-tool-call)
    - [⌛️ Tool Call with Streaming Progress](#%EF%B8%8F-tool-call-with-streaming-progress)
    - [🔄 Background Tool Execution](#-background-tool-execution)
    - [📋 Tool Response Formatter (Direct Response)](#-tool-response-formatter-direct-response)
    - [📦 Structured Content (Client-side Data)](#-structured-content-client-side-data)
    - [🪄 Dynamic Tool Call](#-dynamic-tool-call)
    - [🔌 MCP](#-mcp)
    - [🛠️ Built-in Tools](#️-built-in-tools)
    - [🦞 OpenClaw / Hermes](#-openclaw--hermes)

- [🛡️ Guardrail](#%EF%B8%8F-guardrail)

- [🌎 Platform Guide](#-platform-guide)
    - [🐈 VRChat](#-vrchat)
    - [🍓 Raspberry Pi](#-raspberry-pi)

- [⚙️ Administration](#️-administration)
    - [Admin Panel](#admin-panel)
    - [REST API](#rest-api)
    - [📈 Observability](#-observability)

- [🧪 Evaluation](#-evaluation)

- [🤿 Deep Dive](#-deep-dive)
    - [🐘 PostgreSQL](#-postgresql)
    - [👀 Vision](#-vision)
    - [💾 Long-term Memory](#-long-term-memory)
    - [🐓 Wakeword](#-wakeword)
    - [📋 System Prompt Parameters](#-system-prompt-parameters)
    - [🎛️ Inline LLM Parameters](#️-inline-llm-parameters)
    - [⏰ Timestamp Insertion](#-timestamp-insertion)
    - [🧵 Request merging](#-request-merging)
    - [📥 Invoke Queue](#-invoke-queue)
    - [🧺 Shared Context](#-shared-context)
    - [🔗 Channel Session Manager](#-channel-session-manager)
    - [📡 Channel-aware Processing](#-channel-aware-processing)
    - [🔈 Audio Device](#-audio-device)
    - [🐆 Quick Response](#-quick-response)
    - [🎭 Custom Behavior](#-custom-behavior)
    - [✅ Request Validation](#-request-validation)
    - [🎚️ Noise Filter](#%EF%B8%8F-noise-filter)
    - [🔄 Migration Guide: From v0.6.x to v0.7.0](#-migration-guide-from-v06x-to-v070)


## 🎓 Generative AI

You can set model and system prompt when instantiate `AIAvatar`.

```python
aiavatar_app = AIAvatar(
    openai_api_key="YOUR_OPENAI_API_KEY",
    openai_model="gpt-4o",
    system_prompt="You are my cat."
)
```

### ChatGPT

If you want to configure in detail, create instance of `ChatGPTService` with custom parameters and set it to `AIAvatar`.

```python
# Create ChatGPTService
from aiavatar.sts.llm.chatgpt import ChatGPTService
llm = ChatGPTService(
    openai_api_key=OPENAI_API_KEY,
    model="gpt-4o",
    temperature=0.0,
    system_prompt="You are my cat."
)

# Create AIAvatar with ChatGPTService
aiavatar_app = AIAvatar(
    llm=llm,
    openai_api_key=OPENAI_API_KEY   # API Key for STT
)
```

### OpenAI Responses API

Use `OpenAIResponsesService` to leverage the OpenAI Responses API. Conversation history is managed server-side via `previous_response_id`, eliminating the need for client-side context management.

```python
from aiavatar.sts.llm.openai_responses import OpenAIResponsesService
llm = OpenAIResponsesService(
    openai_api_key=OPENAI_API_KEY,
    model="gpt-5.4",
    system_prompt="You are my cat."
)

aiavatar_app = AIAvatar(
    llm=llm,
    openai_api_key=OPENAI_API_KEY   # API Key for STT
)
```

For lower latency, use the WebSocket variant. This maintains persistent connections via a connection pool, which can reduce latency by up to 40%, especially in tool-call-heavy workflows.

```python
# pip install websockets
from aiavatar.sts.llm.openai_responses_websocket import OpenAIResponsesWebSocketService
llm = OpenAIResponsesWebSocketService(
    openai_api_key=OPENAI_API_KEY,
    model="gpt-5.4",
    reasoning_effort="low",
    system_prompt="You are my cat."
)
```

NOTE: The WebSocket variant does not support the `temperature` parameter. Use `reasoning_effort` ("none", "low", "medium", "high") instead to control response behavior. Dynamic Tool Calls are not supported in either variant, as the server-side history management via `previous_response_id` is incompatible with the pre-flight tool filtering calls.


### Claude

Create instance of `ClaudeService` with custom parameters and set it to `AIAvatar`. The default model is `claude-sonnet-4-5`.

```python
# Create ClaudeService
from aiavatar.sts.llm.claude import ClaudeService
llm = ClaudeService(
    anthropic_api_key=ANTHROPIC_API_KEY,
    model="claude-sonnet-4-5",
    temperature=0.0,
    system_prompt="You are my cat."
)

# Create AIAvatar with ClaudeService
aiavatar_app = AIAvatar(
    llm=llm,
    openai_api_key=OPENAI_API_KEY   # API Key for STT
)
```

NOTE: We support Claude on Anthropic API, not Amazon Bedrock for now. Use LiteLLM or other API Proxies.


### Gemini

Create instance of `GeminiService` with custom parameters and set it to `AIAvatar`. The default model is `gemini-2.0-flash-exp`.

```python
# Create GeminiService
# pip install google-genai
from aiavatar.sts.llm.gemini import GeminiService
llm = GeminiService(
    gemini_api_key=GEMINI_API_KEY,
    model="gemini-2.0-pro-latest",
    temperature=0.0,
    system_prompt="You are my cat."
)

# Create AIAvatar with GeminiService
aiavatar_app = AIAvatar(
    llm=llm,
    openai_api_key=OPENAI_API_KEY   # API Key for STT
)
```

NOTE: We support Gemini on Google AI Studio, not Vertex AI for now. Use LiteLLM or other API Proxies.


### Dify

You can use the Dify API instead of a specific LLM's API. This eliminates the need to manage code for tools or RAG locally.

```python
# Create DifyService
from aiavatar.sts.llm.dify import DifyService
llm = DifyService(
    api_key=DIFY_API_KEY,
    base_url=DIFY_URL,
    user="aiavatarkit_user",
    is_agent_mode=True
)

# Create AIAvatar with DifyService
aiavatar_app = AIAvatar(
    llm=llm,
    openai_api_key=OPENAI_API_KEY   # API Key for STT
)
```


### OpenAI-compatible APIs

`ChatGPTService` supports OpenAI-compatible APIs, such as Grok, Gemini, and Claude.

By specifying the `model`, `openai_api_key`, and `base_url`, these models can now be used with a non-reasoning configuration out of the box.

```python
# Grok
MODEL = "grok-4-1-fast-non-reasoning"
OPENAI_API_KEY = "YOUR_XAI_API_KEY"
BASE_URL = "https://api.x.ai/v1"

# Gemini on Google AI Studio
MODEL = "gemini-2.5-flash"
OPENAI_API_KEY = "YOUR_GEMINI_API_KEY"
BASE_URL = "https://generativelanguage.googleapis.com/v1beta/openai/"

# Claude on Anthropic
LLM_MODEL = "claude-haiku-4-5"
OPENAI_API_KEY = "YOUR_ANTHROPIC_API_KEY"
BASE_URL = "https://api.anthropic.com/v1/"

# Configure ChatGPTService
from aiavatar.sts.llm.chatgpt import ChatGPTService
llm = ChatGPTService(
    openai_api_key=OPENAI_API_KEY,
    base_url=BASE_URL,
    model=MODEL,
    system_prompt=SYSTEM_PROMPT,
    # extra_body={"thinking": { "type": "disabled"}},   # Claude
)
```


### Other LLMs

You can use other LLMs by using `LiteLLMService` or implementing `LLMService` interface.

See the details of LiteLLM here: https://github.com/BerriAI/litellm


### Voice Text Tag (Think Before Answering)

By setting `voice_text_tag`, you can have the LLM "think before answering" (Chain-of-Thought) while vocalizing only the answer portion. You can specify a single tag or a list of tags.

```python
# Single tag: vocalize only <answer> content
llm = ChatGPTService(
    system_prompt="Think within <think> tags. Write your answer within <answer> tags.",
    voice_text_tag="answer"
)

# Multiple tags: vocalize both <ack> and <answer>, skip <think>
llm = ChatGPTService(
    system_prompt="Output <ack>first reaction</ack><think>reasoning</think><answer>full response</answer>",
    voice_text_tag=["ack", "answer"]
)
```


## 🗣️　Voice

You can set speaker id and the base url for VOICEVOX server when instantiate `AIAvatar`.

```python
aiavatar_app = AIAvatar(
    openai_api_key="YOUR_OPENAI_API_KEY",
    # 46 is Sayo. See http://127.0.0.1:50021/speakers to get all ids for characters
    voicevox_speaker=46
)
```

If you want to configure in detail, create instance of `VoicevoxSpeechSynthesizer` with custom parameters and set it to `AIAvatar`.
Here is the example for [AivisSpeech](https://aivis-project.com).

```python
# Create VoicevoxSpeechSynthesizer with AivisSpeech configurations
from aiavatar.sts.tts.voicevox import VoicevoxSpeechSynthesizer
tts = VoicevoxSpeechSynthesizer(
    base_url="http://127.0.0.1:10101",  # Your AivisSpeech API server
    speaker="888753761"   # Anneli
)

# Create AIAvatar with VoicevoxSpeechSynthesizer
aiavatar_app = AIAvatar(
    tts=tts,
    openai_api_key=OPENAI_API_KEY   # API Key for LLM and STT
)
```

You can also set speech controller that uses alternative Text-to-Speech services. We support Azure, Google, OpenAI and any other TTS services supported by [SpeechGateway](https://github.com/uezo/speech-gateway) such as Style-Bert-VITS2 and Aivis Cloud API.

```python
from aiavatar.sts.tts.azure import AzureSpeechSynthesizer
from aiavatar.sts.tts.google import GoogleSpeechSynthesizer
from aiavatar.sts.tts.openai import OpenAISpeechSynthesizer
from aiavatar.sts.tts.speech_gateway import SpeechGatewaySpeechSynthesizer
```

### Instant TTS Synthesizer

For quick setup of custom TTS services with HTTP API endpoints, use `create_instant_synthesizer`. This allows you to create a TTS synthesizer with just HTTP request parameters.

Examples:

```python
from aiavatar.sts.tts import create_instant_synthesizer

# Style-Bert-VITS2 API
sbv2_tts = create_instant_synthesizer(
    method="POST",
    url="http://127.0.0.1:5000/voice",
    json={
        "model_id": "0",
        "speaker_id": "0",
        "text": "{text}"  # Placeholder for processed text
    }
)

# ElevenLabs
elevenlabs_tts = create_instant_synthesizer(
    method="POST",
    url=f"https://api.elevenlabs.io/v1/text-to-speech/{voice_id}",
    headers={
        "xi-api-key": ELEVENLABS_API_KEY
    },
    json={
        "text": "{text}",
        "model_id": "eleven_v3",
        "output_format": "pcm_16000"
    }
)

# Aivis Cloud API
from aiavatar.sts.tts import AudioConverter
aivis_tts = create_instant_synthesizer(
    method="POST",
    url="https://api.aivis-project.com/v1/tts/synthesize",
    headers={
        "Content-Type": "application/json",
        "Authorization": f"Bearer {AIVIS_API_KEY}"
    },
    json={
        "model_uuid": "22e8ed77-94fe-4ef2-871f-a86f94e9a579",   # Kohaku
        "text": "{text}"
    },
    response_parser=AudioConverter(debug=True).convert
)

# Kotodama API (Implement `make_request` to apply style or language.)
import base64
async def base64_to_bytes(http_response) -> bytes:
    response_json = http_response.json()
    b64audio = response_json["audios"][0]
    return base64.b64decode(b64audio)

kotodama_tts = create_instant_synthesizer(
    method="POST",
    url=f"https://tts3.spiral-ai-app.com/api/tts_generate",
    headers={
        "Content-Type": "application/json",
        "X-API-Key": KOTODAMA_API_KEY
    },
    json={
        "text": "{text}",
        "speaker_id": "Marlo",
        "decoration_id": "neutral",
        "audio_format": "wav"
    },
    response_parser=base64_to_bytes
)

# Coefont
import hmac
import hashlib

def make_coefont_request(text: str, style_info: dict, language: str):
    date = str(int(datetime.now(tz=timezone.utc).timestamp()))

    data = json.dumps({
        "coefont": "33e0a2ff-5050-434c-9506-defe97e52f15",  # Yuko Goto
        "text": text
    })

    signature = hmac.new(
        key=bytes(COEFONT_ACCESS_SECRET, "utf-8"),
        msg=(date+data).encode("utf-8"),
        digestmod=hashlib.sha256
    ).hexdigest()

    return httpx.Request(
        method="post",
        url="https://api.coefont.cloud/v2/text2speech",
        headers={
            "Content-Type": "application/json",
            "Authorization": COEFONT_ACCESS_KEY,
            "X-Coefont-Date": date,
            "X-Coefont-Content": signature
        },
        data=data
    )

tts = create_instant_synthesizer(
    request_maker=make_coefont_request,
    follow_redirects=True
)

# Amazon Polly (AWS)
import boto3
from botocore.auth import SigV4Auth
from botocore.awsrequest import AWSRequest

region = "ap-northeast-1"
voice_id = "Mizuki"

session = boto3.Session()
# Set AWS_ACCESS_KEY_ID and AWS_SECRET_ACCESS_KEY as environment variables
credentials = session.get_credentials().get_frozen_credentials()

convert_pcm_to_wave = AudioConverter(input_sample_rate=16000).pcm_to_wave

def aws_polly_request_maker(text, style_info=None, language=None):
    url = f"https://polly.{region}.amazonaws.com/v1/speech"
    body = json.dumps({
        "OutputFormat": "pcm",
        "SampleRate": "16000",
        "Text": text,
        "VoiceId": voice_id,
    })
    aws_request = AWSRequest(method="POST", url=url, data=body, headers={"Content-Type": "application/json"})
    SigV4Auth(credentials, "polly", region).add_auth(aws_request)
    return httpx.Request(method="POST", url=url, headers=dict(aws_request.headers), content=body)

tts = create_instant_synthesizer(
    request_maker=aws_polly_request_maker,
    response_parser=convert_pcm_to_wave,
)

# COEIROINK
tts = create_instant_synthesizer(
    method="POST",
    url="http://127.0.0.1:50032/v1/synthesis",
    headers={"Content-Type": "application/json"},
    json={
        "speakerUuid": "3c37646f-3881-5374-2a83-149267990abc",  # Tsukuyomi-chan
        "styleId": 0,
        "text": "{text}",
        "volumeScale": 1.0,
        "pitchScale": 0.0,
        "intonationScale": 1.0,
        "prePhonemeLength": 0.0,
        "postPhonemeLength": 0.0,
        "outputSamplingRate": 16000,
        "speedScale": 1.0,
    },
    cache_dir="ttscache/coeiroink/tsukuyomi-chan",
)
```

The `{text}` and `{language}` placeholders in params, headers, and json will be automatically replaced with the processed text and language values during synthesis.


You can also make custom tts components by impelemting `SpeechSynthesizer` interface.

### TTS Caching

All TTS synthesizers support optional response caching. When `cache_dir` is set, synthesized audio is saved to disk and reused for identical requests, avoiding redundant API calls.

```python
tts = AzureSpeechSynthesizer(
    azure_api_key=AZURE_API_KEY,
    azure_region=AZURE_REGION,
    speaker="ja-JP-MayuNeural",
    cache_dir="./tts_cache/azure",  # Enable caching
    cache_ext="wav",                # File extension (default: "wav")
)
```

- Cache files are stored as `{sha256_hash}.{cache_ext}` in the specified directory
- The hash is computed from all request parameters (URL, headers, body, etc.)
- Set `cache_dir=None` (default) to disable caching
- Works with all TTS classes: Azure, OpenAI, Google, Voicevox, and InstantSynthesizer
- `SpeechGatewaySpeechSynthesizer` does not use this cache as it caches on the gateway side

### Preprocessing

AIAvatarKit provides text preprocessing functionality that transforms text before Text-to-Speech processing. This enables improved speech quality and conversion of specific text patterns.

#### Alphabet to Katakana Conversion

A preprocessor that converts alphabet text to katakana using LLM. Supports kana_map for storing word-to-reading mappings to reduce latency on repeated words.

```python
from aiavatar.sts.tts.preprocessor.alphabet2kana import AlphabetToKanaPreprocessor

# Create preprocessor with kana_map for pre-registered word-reading mappings
alphabet2kana_preproc = AlphabetToKanaPreprocessor(
    openai_api_key=OPENAI_API_KEY,
    model="gpt-4o-mini",                      # Model to use (default: gpt-4.1-mini)
    alphabet_length=3,                        # Minimum alphabet length to convert (default: 3)
    special_chars=".'-'−–",                   # Characters that connect words (default: ".'-'−–")
    use_kana_map=True,                        # Enable kana_map mode (default: True)
    kana_map={"GitHub": "ギットハブ"},         # Pre-registered word-reading mappings (optional)
    debug=True,                               # Enable debug logging (default: False)
)

# Add to TTS
tts.preprocessors.append(alphabet2kana_preproc)

# Words converted by LLM are automatically added to kana_map
# You can persist and restore kana_map for future sessions:
import json
# Save
with open("kana_map.json", "w") as f:
    json.dump(alphabet2kana_preproc.kana_map, f, ensure_ascii=False)
# Load
with open("kana_map.json") as f:
    kana_map = json.load(f)
```

Key features:
- **kana_map**: Pre-register known word-reading mappings and automatically add LLM results to avoid repeated API calls
- **special_chars**: Words containing these characters (e.g., `Mr.`, `You're`, `Wi-Fi`) are always processed regardless of `alphabet_length`
- **Case-insensitive**: Matches `API`, `api`, and `Api` with a single kana_map entry
- **debug mode**: Logs `[KanaMap]` for cached hits and `[LLM]` for new readings with elapsed time

#### Pattern Match Conversion

You can also use regular expressions and string patterns for conversion:

```python
from aiavatar.sts.tts.preprocessor.patternmatch import PatternMatchPreprocessor

# Create pattern match preprocessor
pattern_preproc = PatternMatchPreprocessor(patterns=[
    ("API", "エーピーアイ"),               # Fixed string replacement
    ("URL", "ユーアールエル"),
    (r"\d+", lambda m: "number"),          # Regex replacement with function
])

# Add common patterns
pattern_preproc.add_number_dash_pattern()  # Number-dash patterns (e.g., 12-34 → イチニの サンヨン)
pattern_preproc.add_phonenumber_pattern()  # Phone number patterns

# Add to TTS
tts.preprocessors.append(pattern_preproc)
```

#### Creating Custom Preprocessors

You can create your own preprocessors by implementing the `TTSPreprocessor` interface:

```python
from aiavatar.sts.tts.preprocessor import TTSPreprocessor

class CustomPreprocessor(TTSPreprocessor):
    def __init__(self, custom_dict: dict = None):
        self.custom_dict = custom_dict or {}
    
    async def process(self, text: str, style_info: dict = None, language: str = None) -> str:
        # Custom conversion logic
        processed_text = text
        
        # Dictionary-based replacement
        for original, replacement in self.custom_dict.items():
            processed_text = processed_text.replace(original, replacement)
        
        # Language-specific conversions
        if language == "ja-JP":
            processed_text = processed_text.replace("OK", "オーケー")
        
        return processed_text

# Use custom preprocessor
custom_preproc = CustomPreprocessor(custom_dict={
    "GitHub": "ギットハブ",
    "Python": "パイソン",
    "Docker": "ドッカー"
})

tts.preprocessors.append(custom_preproc)
```

#### Combining Preprocessors

Multiple preprocessors can be used together. They are executed in the order they were registered:

```python
# Combine multiple preprocessors
tts.preprocessors.extend([
    pattern_preproc,        # 1. Pattern match conversion
    alphabet2kana_preproc,  # 2. Alphabet to katakana conversion
    custom_preproc          # 3. Custom conversion
])
```


### Adjusting Speech Speed

With `SpeechGatewaySpeechSynthesizer`, you can change the speech speed per session by setting the speed either on the entire instance or in `style_info`.

Here is an example of storing the speech speed as `tts_speed` in session data when using WebSocketAdapter.

```python
# Apply speech speed per session
from aiavatar.sts.llm import LLMResponse
@aiavatar_app.sts.process_llm_chunk
async def process_llm_chunk(llm_stream_chunk: LLMResponse, session_id: str, user_id: str) -> dict:
    if session_data := aiavatar_app.sessions.get(session_id):
        if speed := session_data.data.get("tts_speed"):
            return {"speed": float(speed)}
```

NOTE: To configure `tts_speed`, you can either set up a REST API endpoint to update it directly, or use control tags included in responses to update it.


## 👂 Speech listener

If you want to configure in detail, create instance of `SpeechRecognizer` with custom parameters and set it to `AIAvatar`. We support Azure, Google and OpenAI Speech-to-Text services.

NOTE: **`AzureSpeechRecognizer` is much faster** than Google and OpenAI(default).

```python
# Create AzureSpeechRecognizer
from aiavatar.sts.stt.azure import AzureSpeechRecognizer
stt = AzureSpeechRecognizer(
    azure_api_key=AZURE_API_KEY,
    azure_region=AZURE_REGION
)

# Create AIAvatar with AzureSpeechRecognizer
aiavatar_app = AIAvatar(
    stt=stt,
    openai_api_key=OPENAI_API_KEY   # API Key for LLM
)
```

You can also make custom STT components by implementing `SpeechRecognizer` interface.

### Preprocessing and Postprocessing

You can add custom preprocessing and postprocessing to any `SpeechRecognizer` implementation. This is useful for tasks like speaker verification, audio filtering, or text normalization.

```python
from aiavatar.sts.stt.openai import OpenAISpeechRecognizer

# Create recognizer
recognizer = OpenAISpeechRecognizer(openai_api_key="your-api-key")

# Add preprocessing - e.g., speaker verification
@recognizer.preprocess
async def verify_speaker(session_id: str, audio_data: bytes):
    # Perform speaker verification
    is_valid_speaker = await check_speaker_identity(audio_data)
    
    if not is_valid_speaker:
        # Return empty bytes to skip transcription
        return b"", {"rejected": True, "reason": "speaker_mismatch"}
    
    # Return processed audio and metadata
    filtered_audio = apply_noise_filter(audio_data)
    return filtered_audio, {"speaker_verified": True, "session_id": session_id}

# Add postprocessing - e.g., text formatting
@recognizer.postprocess
async def format_text(session_id: str, text: str, audio_data: bytes, preprocess_metadata: dict):
    # Format transcribed text
    formatted_text = text.strip().capitalize()
    
    # Add punctuation if missing
    if formatted_text and formatted_text[-1] not in '.!?':
        formatted_text += '.'
    
    # Return formatted text and metadata
    return formatted_text, {
        "original_text": text,
        "formatting_applied": True,
        "preprocess_info": preprocess_metadata
    }

# Use the recognizer with preprocessing and postprocessing
result = await recognizer.recognize(
    session_id="user-123",
    data=audio_bytes
)

print(f"Text: {result.text}")
print(f"Preprocess metadata: {result.preprocess_metadata}")
print(f"Postprocess metadata: {result.postprocess_metadata}")
```

The preprocessing and postprocessing functions can return either:
- Just the processed data (bytes for preprocess, string for postprocess)
- A tuple of (processed_data, metadata_dict) for additional information

If preprocessing returns empty bytes, the transcription is skipped and the result will have `text=None`.


### Speaker Diarization

AIAvatarKit provides speaker diarization functionality to suppress responses to voices other than the main speaker. This prevents interruptions from surrounding conversations or venue announcements at events.

The `MainSpeakerGate` provides the following features:

- Calculates voice embeddings from request audio
- Registers a voice as the main speaker when similarity exceeds threshold for 2 consecutive requests (per session)
- Returns `accepted=True` when request audio similarity exceeds threshold after main speaker registration
- Returns `accepted=True` when no main speaker is registered yet

**NOTE:** While mechanically ignoring non-main speaker voices (Example 1) is simplest, it risks stopping conversation due to misidentification and cannot handle speaker changes. Consider context-aware handling (Example 2) as well.

```python
from aiavatar.sts.stt.speaker_gate import MainSpeakerGate
speaker_gate = MainSpeakerGate()

# Example 1: Drop request when the voice is not from main speaker
@aiavatar_app.sts.stt.preprocess
async def stt_preprocess(session_id: str, audio_bytes: bytes):
    # Compare with main speaker's voice embedding
    gate_response = await speaker_gate.evaluate(session_id, audio_bytes, aiavatar_app.sts.vad.sample_rate)
    # Branch processing based on similarity with main speaker's voice
    if not gate_response.accepted:
        logger.info(f"Ignore other speaker's voice: confidence={gate_response.confidence}")
        return None, gate_response.to_dict()
    else:
        return audio_bytes, gate_response.to_dict()

# Example 2: Add annotation for LLM that the voice is not from main speaker
@aiavatar_app.sts.stt.postprocess
async def stt_postprocess(session_id: str, text: str, audio_bytes: bytes, preprocess_metadata: dict):
    # Compare with main speaker's voice embedding
    gate_response = await speaker_gate.evaluate(session_id, audio_bytes, aiavatar_app.sts.vad.sample_rate)
    # Branch processing based on similarity with main speaker's voice
    if not gate_response.accepted:
        logger.info(f"Adding note that this may be from a different speaker: confidence={gate_response.confidence}")
        return f"$The following request may not be from the main speaker (similarity: {gate_response.confidence}). Determine from the content whether to respond. If you should not respond, output just[wait:user] as the answer:\n\n{text}", gate_response.to_dict()
    else:
        return text, gate_response.to_dict()
```


## 🎙️ Speech Detector

AIAvatarKit includes Voice Activity Detection (VAD) components to automatically detect when speech starts and ends in audio streams. This enables seamless conversation flow without manual input controls.

### Silero Speech Detector

The default Speech Detector is `SileroSpeechDetector`, which employs AI-based voice activity detection using the Silero VAD model:

```python
from aiavatar.sts.vad.silero import SileroSpeechDetector

vad = SileroSpeechDetector(
    speech_probability_threshold=0.5,    # AI model confidence threshold (0.0-1.0)
    silence_duration_threshold=0.5,      # Seconds of silence to end recording
    volume_db_threshold=None,            # Optional: filter by volume in dB (e.g., -30.0)
    max_duration=10.0,                   # Maximum recording duration
    min_duration=0.2,                    # Minimum recording duration
    sample_rate=16000,                   # Audio sample rate
    channels=1,                          # Audio channels
    chunk_size=512,                      # Audio processing chunk size
    model_pool_size=1,                   # Number of parallel AI models
    debug=True
)

aiavatar_app = AIAvatar(vad=vad, openai_api_key=OPENAI_API_KEY)
```

For high-concurrency applications:

```python
vad = SileroSpeechDetector(
    speech_probability_threshold=0.6,    # Stricter threshold for noisy environments
    model_pool_size=4,                   # 4 parallel AI models for load balancing
    debug=False
)
```

To use a local Silero VAD model file instead of downloading from the hub, set `model_path`:

```python
vad = SileroSpeechDetector(
    model_path="path/to/silero_vad.jit"
)
```


### Silero Stream Speech Detector

`SileroStreamSpeechDetector` extends `SileroSpeechDetector` with segment-based speech recognition. It performs partial transcription during recording, allowing you to receive intermediate results before the final transcription.

```python
from aiavatar.sts.vad.stream import SileroStreamSpeechDetector
from aiavatar.sts.stt.google import GoogleSpeechRecognizer

vad = SileroStreamSpeechDetector(
    speech_recognizer=GoogleSpeechRecognizer(...),
    segment_silence_threshold=0.2,       # Silence duration to trigger segment recognition
    silence_duration_threshold=0.5,      # Silence duration to finalize recording
    # Inherits all SileroSpeechDetector parameters
)
```

#### Segment Recognition Callback

The `on_speech_detecting` callback is triggered when a speech segment is recognized:

```python
@vad.on_speech_detecting
async def on_speech_detecting(text, session):
    print(f"Partial text: {text}")

    # For WebSocket apps, send partial text to client via info message
    # resp = STSResponse(
    #     type="info",
    #     session_id=session.session_id,
    #     metadata={"partial_request_text": text}
    # )
    # await ws_app.handle_response(resp)
```

#### Text Validation

Use `validate_recognized_text` to filter out invalid recognition results:

```python
@vad.validate_recognized_text
def validate(text):
    if len(text) < 2:
        return "Text too short"  # Return error message to reject
    return None  # Return None to accept
```


### Azure Stream Speech Detector

`AzureStreamSpeechDetector` uses Azure's streaming speech recognition service for both speech detection and transcription. Audio is continuously streamed to Azure, and speech boundaries are determined by Azure's recognition events.

```sh
pip install azure-cognitiveservices-speech
```

```python
from aiavatar.sts.vad.azure_stream import AzureStreamSpeechDetector

vad = AzureStreamSpeechDetector(
    azure_subscription_key=AZURE_API_KEY,
    azure_region=AZURE_REGION
)
```

This detector also supports the `on_speech_detecting` callback for partial transcription results:

```python
@vad.on_speech_detecting
async def on_speech_detecting(text, session):
    print(f"Partial text: {text}")

    # For WebSocket apps, send partial text to client via info message
    # resp = STSResponse(
    #     type="info",
    #     session_id=session.session_id,
    #     metadata={"partial_request_text": text}
    # )
    # await ws_app.handle_response(resp)
```

### AWS Stream Speech Detector

`AmazonTranscribeStreamSpeechDetector` uses Amazon Transcribe's streaming speech recognition service for both speech detection and transcription. Audio is continuously streamed to Amazon Transcribe, and speech boundaries are determined by the recognition results combined with a configurable silence duration threshold.

```sh
pip install amazon-transcribe
```

```python
from aiavatar.sts.vad.amazon_transcribe_stream import AmazonTranscribeStreamSpeechDetector

vad = AmazonTranscribeStreamSpeechDetector(
    aws_region="ap-northeast-1",
    aws_access_key_id=AWS_ACCESS_KEY_ID,         # Optional: uses default credential chain if omitted
    aws_secret_access_key=AWS_SECRET_ACCESS_KEY,  # Optional: uses default credential chain if omitted
    aws_language="ja-JP",
    silence_duration_threshold=0.5,  # Seconds of silence after last recognition to finalize
    max_duration=20.0,               # Maximum recording duration in seconds
)
```

When `silence_duration_threshold > 0`, multiple recognition results from Amazon Transcribe are accumulated into a single speech detection event. A silence timer starts after each final result, and if new speech arrives before the timer expires, the timer is cancelled and transcription continues. This allows natural pauses within a sentence without splitting the utterance.

> **Note:** The `silence_duration_threshold` timer starts from when Amazon Transcribe returns a final recognition result, not from when the user actually stops speaking. Since Amazon Transcribe takes some time to process audio and return a final result, the actual delay from the user's perspective is: **Transcribe processing delay + `silence_duration_threshold`**. For example, if Transcribe takes ~0.5s to return a final result and `silence_duration_threshold=0.5`, the total delay from the end of speech to firing `on_speech_detected` will be approximately 1.0s.

When `max_duration` is reached during recording, if there are accumulated recognition results, speech detection is triggered immediately with the combined text.

This detector also supports the `on_speech_detecting` callback for partial transcription results. When texts have been accumulated from previous final results, they are prepended to the current partial text:

```python
@vad.on_speech_detecting
async def on_speech_detecting(text, session):
    print(f"Partial text: {text}")

    # For WebSocket apps, send partial text to client via info message
    # resp = STSResponse(
    #     type="info",
    #     session_id=session.session_id,
    #     metadata={"partial_request_text": text}
    # )
    # await ws_app.handle_response(resp)
```

Use `validate_recognized_text` to filter out invalid recognition results:

```python
@vad.validate_recognized_text
def validate(text):
    if len(text) < 2:
        return "Text too short"  # Return error message to reject
    return None  # Return None to accept
```


### Customization

#### on_recording_started Callback

The `on_recording_started` callback is triggered when recording has been active long enough to be considered meaningful speech. This is useful for stopping AI speech when the user starts talking.

```python
# Option 1: Pass callback in constructor
async def my_recording_started_handler(session_id: str):
    print(f"Recording started for session: {session_id}")
    await stop_ai_speech()

vad = SileroSpeechDetector(
    on_recording_started=my_recording_started_handler,
    on_recording_started_min_duration=1.5,    # Trigger after 1.5 sec of speech (default)
    # other parameters...
)

# Option 2: Use decorator
@vad.on_recording_started
async def on_recording_started(session_id):
    await stop_ai_speech()
```

For stream-based detectors (`SileroStreamSpeechDetector`, `AzureStreamSpeechDetector`), the callback can also be triggered by recognized text length:

```python
vad = SileroStreamSpeechDetector(
    speech_recognizer=speech_recognizer,
    on_recording_started_min_duration=1.5,    # Trigger after 1.5 sec of speech
    on_recording_started_min_text_length=2,   # OR trigger when text >= 2 chars
)
```

#### Custom Trigger Condition

You can customize when `on_recording_started` fires using the `should_trigger_recording_started` decorator:

```python
@vad.should_trigger_recording_started
def custom_trigger(text, session):
    # text: Recognized text (None for non-stream detectors)
    # session: Recording session object
    # Return True to trigger the callback
    return text and len(text) >= 5
```


### Standard Speech Detector (Legacy)

`StandardSpeechDetector` uses simple volume-based detection. Consider using `SileroSpeechDetector` for better accuracy. This detector is suitable for environments with limited computing resources:

```python
from aiavatar.sts.vad.standard import StandardSpeechDetector

vad = StandardSpeechDetector(
    volume_db_threshold=-30.0,           # Voice detection threshold in dB
    silence_duration_threshold=0.5,      # Seconds of silence to end recording
    max_duration=10.0,                   # Maximum recording duration
    min_duration=0.2,                    # Minimum recording duration
    sample_rate=16000,                   # Audio sample rate
    channels=1,                          # Audio channels
    preroll_buffer_count=5,              # Pre-recording buffer size
    debug=True
)
```


## 🥰 Face expression

To control facial expressions within conversations, set the facial expression names and values in `FaceController.faces` as shown below, and then include these expression keys in the response message by adding instructions to the prompt.

```python
aiavatar_app.face_controller.faces = {
    "neutral": "🙂",
    "joy": "😀",
    "angry": "😠",
    "sorrow": "😞",
    "fun": "🥳"
}

aiavatar_app.sts.llm.system_prompt = """# Face Expression

* You have the following expressions:

- joy
- angry
- sorrow
- fun

* If you want to express a particular emotion, please insert it at the beginning of the sentence like [face:joy].

Example
[face:joy]Hey, you can see the ocean! [face:fun]Let's go swimming.
"""
```

> **Note:** XML-style tags are also supported: `<face name="joy" />`, `<animation name="wave_hands" />`. Both bracket and XML formats can be used interchangeably.

This allows emojis like 🥳 to be autonomously displayed in the terminal during conversations. To actually control the avatar's facial expressions in a metaverse platform, instead of displaying emojis like 🥳, you will need to use custom implementations tailored to the integration mechanisms of each platform. Please refer to our `VRChatFaceController` as an example.


## 💃 Animation

Now writing... ✍️


## 🥳 Character Management

`CharacterService` provides functionality for managing AI character settings and generating dynamic content such as schedules and diaries based on character personalities.

Schedules and diaries are generated as if by the character's own will. By updating these daily and incorporating them into prompts, you can make the character feel like they are actually living in real-world time.

**Note:** This feature requires PostgreSQL as the database backend.


### Get started

Register a new character using a character setting prompt. At this time, both the weekly schedule and today's schedule are also generated.

```python
from datetime import date
from aiavatar.character import CharacterService

# Initialize service
character_service = CharacterService(
    openai_api_key="YOUR_API_KEY"
)

# Initialize a new character with weekly and daily schedules
character, weekly, daily = await character_service.initialize_character(
    name="Alice",
    character_prompt="You are Alice, a cheerful high school student who loves reading..."
)

print(f"Character ID: {character.id}")
```

To use the registered and generated content as a system prompt, implement `LLMService.get_system_prompt` as follows:

```python
@llm.get_system_prompt
async def get_system_prompt(context_id: str, user_id: str, system_prompt_params: dict):
    return await character_service.get_system_prompt(
        character_id="YOUR_CHARACTER_ID",
        system_prompt_params=system_prompt_params
    )
```

This system prompt includes not only the character settings from `character_prompt`, but also the schedule for the day.


### Updating Diaries

Diaries can be automatically generated using `create_diary_with_generation`. The following information is used:

- Character settings
- Today's schedule
- Today's news (retrieved via web search)
- Previous day's diary

```python
# Generate diary from daily activities
diary = await character_service.create_diary_with_generation(
    character_id=character.id,
    diary_date=date.today()
)
```

The generated diary can be used as context for the LLM using `GetDiaryTool`. By setting `include_schedule=True`, the schedule information for the day is also retrieved (default is `True`).

```python
from aiavatar.character.tools import GetDiaryTool
llm.add_tool(
    GetDiaryTool(
        character_service=character_service,
        character_id=YOUR_CHARACTER_ID,
        include_schedule=True
    )
)
```


### Updating Schedules

Daily schedules can be automatically generated using `create_daily_schedule_with_generation`. The following information is used:

- Character settings
- Weekly schedule
- Previous day's schedule

```python
daily_schedule = await character_service.create_daily_schedule_with_generation(
    character_id=character.id,
    schedule_date=date.today()
)
```

### Automated Daily Updates

For a more realistic character experience, use a scheduler service (such as cron) to automatically update schedules and diaries:

- **Daily schedule**: Generate at the beginning of each day (e.g., 0:00 or 6:00)
- **Diary**: Generate at the end of each day (e.g., 23:00)

Example cron configuration:

```
# Generate daily schedule at 6:00 AM
0 6 * * * /usr/bin/python3 /path/to/generate_schedule.py

# Generate diary at 11:00 PM
0 23 * * * /usr/bin/python3 /path/to/generate_diary.py
```

Example script for `generate_schedule.py`:

```python
import asyncio
from datetime import date
from aiavatar.character import CharacterService

async def main():
    character_service = CharacterService(
        openai_api_key="YOUR_API_KEY"
    )
    await character_service.create_daily_schedule_with_generation(
        character_id="YOUR_CHARACTER_ID",
        schedule_date=date.today()
    )

asyncio.run(main())
```

### Batch Generation

You can batch generate daily schedules and diaries for a date range using `create_activity_range_with_generation`.

```python
await character_service.create_activity_range_with_generation(
    character_id=YOUR_CHARACTER_ID,
    start_date=date(2026, 1, 8),
    end_date=date(2026, 1, 16),  # Defaults to today if omitted
    overwrite=False,
)
```

This is useful for recovering data when automatic updates were stopped, or for building up initial data when creating a new character.

### Long-term Memory

This feature is **optional**. If you want to make diaries searchable as long-term memory, you can integrate with an external memory service by configuring `MemoryClient`:

```python
from aiavatar.character import CharacterService, MemoryClient

memory_client = MemoryClient(base_url="http://memory-service:8000")

character_service = CharacterService(
    openai_api_key="YOUR_API_KEY",
    memory_client=memory_client
)
```

Registered diaries can be included in search results using the `search` method.

```python
# In addition to diaries, conversation history with users and other knowledge are searched comprehensively
result = await character_service.memory.search(
    character_id="YOUR_CHARACTER_ID",
    user_id="YOUR_USER_ID",
    query="travel summer 2026"
)
```

The default `MemoryClient` uses [ChatMemory](https://github.com/uezo/chatmemory) as its backend, but you can also use other long-term memory services by inheriting from `MemoryClientBase`.


### Binding to Adapter

The `bind_character` function provides a convenient way to integrate character management with your AIAvatar application. It automatically configures the system prompt, user management, and character-related tools in a single call.

```python
from aiavatar.character import CharacterService
from aiavatar.character.binding import bind_character

character_service = CharacterService(
    openai_api_key="YOUR_API_KEY"
)

bind_character(
    adapter=aiavatar_app,
    character_service=character_service,
    character_id="YOUR_CHARACTER_ID",
    default_user_name="You"
)
```

This single function call sets up:

- **System prompt**: Automatically retrieves the character's system prompt with user-specific parameters
- **User management**: Creates a new user with `default_user_name` if the user doesn't exist
- **Username sync**: Sends the username and character name to the client on connection, and updates when changed
- **Tools**: Registers the following tools automatically:
  - `UpdateUsernameTool`: Allows the character to update the user's name during conversation
  - `GetDiaryTool`: Retrieves the character's diary and schedule
  - `MemorySearchTool`: Searches long-term memory (only if `memory_client` is configured)


### CharacterLoader (Lightweight Alternative)

`CharacterLoader` is a lightweight alternative to `CharacterService` that loads character settings from local files instead of a database. No database or external API is required — just plain markdown and JSON files.

This is ideal when you want to quickly set up a character without infrastructure, or when you prefer to manage character definitions as files.

#### Single file mode

The simplest usage is to point to a single markdown file containing the system prompt:

```python
from aiavatar.character.loader import CharacterLoader

loader = CharacterLoader("system_prompt.md")

# Bind to LLM service
loader.bind(adapter.sts.llm)
```

#### Directory mode

For richer character definitions, use directory mode with `split_initial_messages=True`. Initial messages are prepended to the conversation history as pseudo user/assistant turns, allowing you to inject character knowledge (episodes, attributes, conversation examples) without overloading the system prompt. Point to a directory containing:

```
my_character/
├── character.md                # Character settings (required with split_initial_messages)
├── response_instructions.md    # Response rules (optional, appended to system prompt)
├── message_templates.json      # Template definitions for initial messages
├── episode.md                  # Character's past experiences (optional)
├── attribute.md                # Likes, dislikes, personality traits (optional)
└── conversation_example.md     # Example dialogues for tone reference (optional)
```

```python
loader = CharacterLoader(
    "my_character",
    split_initial_messages=True,
    lang="ja",
    user_names={"user_001": "Alice"},
    default_user_name="You"
)

loader.bind(adapter.sts.llm)
```

The `message_templates.json` defines how initial messages and self-introduction are structured:

```json
{
    "initial_message_defs": {
        "ja": {
            "self_intro": "わかりました。{username}さんですね。",
            "episode": "わかりました。",
            "attribute": "わかりました。"
        }
    },
    "prefixes": {
        "ja": {
            "episode": "以下はあなたの過去の経験です。\n\n",
            "attribute": "以下はあなたの属性情報です。\n\n"
        }
    },
    "self_intro_template": {
        "ja": "$ユーザーの名前は{username}です。"
    }
}
```

#### Hot reload

All files are cached with mtime-based invalidation. Edit any file while the application is running, and changes will be reflected on the next request — no restart needed.

#### Custom user name resolution

Use the `@loader.get_user_name` decorator to resolve user names dynamically (e.g., from a database or external service):

```python
@loader.get_user_name
def get_user_name(user_id: str):
    return db.get_username(user_id)
```

#### Custom message formatting

Use the `@loader.format_messages` decorator to post-process initial messages before they are sent to the LLM:

```python
@loader.format_messages
def format_messages(messages):
    # Add timestamps, filter messages, etc.
    return messages
```

#### Comparison with CharacterService

| | CharacterLoader | CharacterService |
|---|---|---|
| Data source | Local files (`.md`, `.json`) | Database (SQLite / PostgreSQL) |
| Dependencies | None (standard library only) | `openai`, database libraries |
| Schedule / Diary generation | Not supported | Auto-generated via LLM |
| Long-term memory | Not supported | Supported via MemoryClient |
| Character tools | Not included | username update, diary, memory search |
| Hot reload | Supported (mtime-based) | Not supported |


## 🧩 API

You can host AIAvatarKit on a server to enable multiple clients to have independent context-aware conversations via RESTful API with streaming responses (Server-Sent Events) and WebSocket.

### 💫 RESTful API (SSE)

Below is the simplest example of a server program:

```python
from fastapi import FastAPI
from aiavatar.adapter.http.server import AIAvatarHttpServer

# AIAvatar
aiavatar_app = AIAvatarHttpServer(
    openai_api_key=OPENAI_API_KEY,
    debug=True
)

# Setup FastAPI app with AIAvatar components 
app = FastAPI()
router = aiavatar_app.get_api_router()
app.include_router(router)
```

Save the above code as `server.py` and run it using:

```sh
uvicorn server:app
```


Next is the simplest example of a client program:

```python
import asyncio
from aiavatar.adapter.http.client import AIAvatarHttpClient

aiavatar_app = AIAvatarHttpClient(
    debug=True
)
asyncio.run(aiavatar_app.start_listening(session_id="http_session", user_id="http_user"))
```

Save the above code as `client.py` and run it using:

```sh
python client.py
```

You can now perform voice interactions just like when running locally.


When using the streaming API via HTTP, clients communicate with the server using JSON-formatted requests.

Below is the format for initiating a session:

```json
{
    "type": "start",          // Always `start`
    "session_id": "6d8ba9ac-a515-49be-8bf4-cdef021a169d",
    "user_id": "user_id",
    "context_id": "c37ac363-5c65-4832-aa25-fd3bbbc1b1e7",   // Set null or provided id in `start` response
    "text": "こんにちは",       // If set, audio_data will be ignored         
    "audio_data": "XXXX",     // Base64 encoded audio data
    "files": [
        {
            "type": "image",        // Only `image` is supported for now
            "url": "https://xxx",
        }
    ],
    "metadata": {}
}
```

The server returns responses as a stream of JSON objects in the following structure.

The communication flow typically consists of:

```json
{
    "type": "chunk",    // start -> chunk -> final
    "session_id": "6d8ba9ac-a515-49be-8bf4-cdef021a169d",
    "user_id": "user01",
    "context_id": "c37ac363-5c65-4832-aa25-fd3bbbc1b1e7",
    "text": "[face:joy]こんにちは！",   // Response text with info
    "voice_text": "こんにちは！",       // Response text for voice synthesis
    "avatar_control_request": {
        "animation_name": null,       // Parsed animation name
        "animation_duration": null,   // Parsed duration for animation
        "face_name": "joy",           // Parsed facial expression name
        "face_duration": 4.0          // Parsed duration for the facial expression
    },
    "audio_data": "XXXX",   // Base64 encoded. Playback this as the character's voice.
    "metadata": {
        "is_first_chunk": true
    }
}
```


You can test the streaming API using a simple curl command:

```sh
curl -N -X POST http://127.0.0.1:8000/chat \
    -H "Content-Type: application/json" \
    -d '{
        "type": "start",
        "session_id": "6d8ba9ac-a515-49be-8bf4-cdef021a169d",
        "user_id": "user01",
        "text": "こんにちは"
    }'

```

Sample response (streamed from the server):

```sh
data: {"type": "start", "session_id": "6d8ba9ac-a515-49be-8bf4-cdef021a169d", "user_id": "user01", "context_id": "c37ac363-5c65-4832-aa25-fd3bbbc1b1e7", "text": null, "voice_text": null, "avatar_control_request": null, "audio_data": "XXXX", "metadata": {"request_text": "こんにちは"}}

data: {"type": "chunk", "session_id": "6d8ba9ac-a515-49be-8bf4-cdef021a169d", "user_id": "user01", "context_id": "c37ac363-5c65-4832-aa25-fd3bbbc1b1e7", "text": "[face:joy]こんにちは！", "voice_text": "こんにちは！", "avatar_control_request": {"animation_name": null, "animation_duration": null, "face_name": "joy", "face_duration": 4.0}, "audio_data": "XXXX", "metadata": {"is_first_chunk": true}}

data: {"type": "chunk", "session_id": "6d8ba9ac-a515-49be-8bf4-cdef021a169d", "user_id": "user01", "context_id": "c37ac363-5c65-4832-aa25-fd3bbbc1b1e7", "text": "今日はどんなことをお手伝いしましょうか？", "voice_text": "今日はどんなことをお手伝いしましょうか？", "avatar_control_request": {"animation_name": null, "animation_duration": null, "face_name": null, "face_duration": null}, "audio_data": "XXXX", "metadata": {"is_first_chunk": false}}

data: {"type": "final", "session_id": "6d8ba9ac-a515-49be-8bf4-cdef021a169d", "user_id": "user01", "context_id": "c37ac363-5c65-4832-aa25-fd3bbbc1b1e7", "text": "[face:joy]こんにちは！今日はどんなことをお手伝いしましょうか？", "voice_text": "こんにちは！今日はどんなことをお手伝いしましょうか？", "avatar_control_request": null, "audio_data": "XXXX", "metadata": {}}
```

To continue the conversation, include the `context_id` provided in the `start` response in your next request.

**NOTE:** When using the RESTful API, voice activity detection (VAD) must be performed client-side.

**NOTE:** To protect API with API Key, set `api_key=API_KEY_YOU_MAKE` to AIAvatarHttpServer and send `Authorization: Bearer {API_KEY_YOU_MAKE}` as HTTP header from client.


### 🔵 Dify-compatible API

`AIAvatarHttpServer` provides a Dify-compatible `/chat-messages` endpoint (SSE streaming only).
This allows you to connect frontend applications that use Dify as their backend.

For more details, refer to the [Dify API Guide](https://docs.dify.ai/en/guides/application-publishing/developing-with-apis)
or the API documentation of your self-hosted Dify application.


### 🔌 WebSocket

Below is the simplest example of a server program:

```python
from fastapi import FastAPI
from aiavatar.adapter.websocket.server import AIAvatarWebSocketServer

# Create AIAvatar
aiavatar_app = AIAvatarWebSocketServer(
    openai_api_key=OPENAI_API_KEY,
    volume_db_threshold=-30,  # <- Adjust for your audio env
    debug=True
)

# Set router to FastAPI app
app = FastAPI()
router = aiavatar_app.get_websocket_router()
app.include_router(router)
```

Save the above code as `server.py` and run it using:

```sh
uvicorn server:app
```

**NOTE:** When you specify `response_audio_chunk_size` in the `AIAvatarWebSocketServer` instance, the audio response will be streamed as PCM data chunks of the specified byte size. In this case, no WAVE header will be included in the response - you'll receive raw PCM audio data only.


Next is the simplest example of a client program:

```python
import asyncio
from aiavatar.adapter.websocket.client import AIAvatarWebSocketClient

client = AIAvatarWebSocketClient()
asyncio.run(client.start_listening(session_id="ws_session", user_id="ws_user"))
```

Save the above code as `client.py` and run it using:

```sh
python client.py
```

You can now perform voice interactions just like when running locally.

**NOTE:** When using the WebSocket API, voice activity detection (VAD) is performed on the server side, so clients can simply stream microphone input directly to the server.


#### Connection and Disconnection Handling

You can register callbacks to handle WebSocket connection and disconnection events. This is useful for logging, session management, or custom initialization/cleanup logic.

```python
@aiavatar_app.on_connect
async def on_connect(request, session_data):
    print(f"Client connected: {session_data.id}")
    print(f"User ID: {session_data.user_id}")
    print(f"Session ID: {session_data.session_id}")
    
    # Custom initialization logic
    # e.g., load user preferences, initialize resources, etc.

@aiavatar_app.on_disconnect
async def on_disconnect(session_data):
    print(f"Client disconnected: {session_data.id}")
    
    # Custom cleanup logic
    # e.g., save session data, release resources, etc.
```

The `session_data` object contains information about the WebSocket session:

- `id`: Unique session identifier
- `user_id`: User identifier from the connection request
- `session_id`: Session identifier from the connection request
- Additional metadata passed during connection


### 🟩 LINE Bot

You can build a LINE Bot using the LINE Messaging API.

```python
# NOTE: Register https://{your.domain}/webhook as the "Webhook URL" in LINE Developers Console

# Create LINE Bot adapter
from aiavatar.adapter.linebot.server import AIAvatarLineBotServer
aiavatar_app = AIAvatarLineBotServer(
    openai_model="gpt-5.1",
    system_prompt="You are a cat.",
    openai_api_key=OPENAI_API_KEY,
    channel_access_token=LINEBOT_CHANNEL_ACCESS_TOKEN,
    channel_secret=LINEBOT_CHANNEL_SECRET,
    image_download_url_base="https://{your.domain}",
    debug=True
)

# Create FastAPI app
from fastapi import FastAPI
app = FastAPI()

# Set adapter endpoints
router = aiavatar_app.get_api_router()
app.include_router(router)
```

Note: `image_download_url_base` is optional. If omitted, images from users are embedded as base64 data URLs directly in the LLM context, eliminating the need for the LLM to fetch images from an external URL.

By default, the LINE Messaging API user ID is used as the AIAvatarKit user ID. To map channel user IDs to your own app-level user IDs, use `ChannelContextBridge`. See [Channel Context Bridge](#-channel-context-bridge) for details.

Customization hooks:

```python
@aiavatar_app.preprocess_request
async def preprocess_request(request: STSRequest):
    # Pre-process request before sending to LLM
    # e.g. edit request text
    request.text = "Pre-processed: " + request.text

@aiavatar_app.preprocess_response
async def preprocess_response(response: STSResponse):
    # Pre-process response before sending to LINE API
    # e.g. edit response voice_text (not text)
    response.voice_text = "Pre-processed: " + response.voice_text

@aiavatar_app.process_avatar_control_request
async def process_avatar_control_request(avatar_control_request: AvatarControlRequest, reply_message_request: ReplyMessageRequest):
    # Process facial expression
    # e.g. set `sender` to the message in reply_message_request to change icon
    face = avatar_control_request.face_name
    if face:
        reply_message_request.messages[0].sender = Sender(iconUrl=f"https://your_domain/path/to/icon/{face}.png")

@aiavatar_app.on_send_error_message
async def on_send_error_message(reply_message_request: ReplyMessageRequest, event: Event, ex: Exception):
    # Pre-process error message
    # e.g. edit error response
    text = make_user_friendly_error_message(event, ex)
    reply_message_request.messages[0] = TextMessage(text=text)

@aiavatar_app.event("postback")
async def handle_postback_event(event: Event, user_id: str, context_id: Optional[str]):
    # Process event
    # e.g. Register postback data
    await register_data(user_id, event.postback.data)
```


Context data is stored in `aiavatar.db` via SQLite by default. To use PostgreSQL, create a `PostgreSQLChannelContextBridge` and pass it to `AIAvatarLineBotServer` as `channel_context_bridge`. See [Channel Context Bridge](#-channel-context-bridge) for details.

```python
from aiavatar.adapter.channel_context_bridge.postgres import PostgreSQLChannelContextBridge
bridge = PostgreSQLChannelContextBridge(
    host=DB_HOST,
    port=DB_PORT,
    dbname=DB_NAME,
    user=DB_USER,
    password=DB_PASSWORD
)

aiavatar_app = AIAvatarLineBotServer(
    openai_model="gpt-5.1",
    system_prompt="You are a cat.",
    openai_api_key=OPENAI_API_KEY,
    channel_access_token=LINEBOT_CHANNEL_ACCESS_TOKEN,
    channel_secret=LINEBOT_CHANNEL_SECRET,
    image_download_url_base="https://{your.domain}",
    channel_context_bridge=bridge,    # <- Set PostgreSQL context bridge
    debug=True
)
```


### STT / TTS Endpoints

AIAvatarHttpServer provides REST API endpoints for Speech-to-Text (STT) and Text-to-Speech (TTS) functionality:

#### STT Endpoint
`POST /stt` - Converts audio to text.

```python
import requests

# Read audio file
with open("audio.wav", "rb") as f:
    audio_data = f.read()

# Send to STT endpoint
response = requests.post(
    "http://localhost:8000/stt",
    data=audio_data,
    headers={"Content-Type": "audio/wav"}
)

print(response.json())  # {"text": "recognized speech"}
```

#### TTS Endpoint
`POST /tts` - Converts text to speech.

```python
import requests

# Send text to TTS endpoint
response = requests.post(
    "http://localhost:8000/tts",
    json={"text": "Hello, this is AI Avatar speaking"}
)

# Save audio response
with open("output.wav", "wb") as f:
    f.write(response.content)
```


## 🛡️ Guardrail

You can apply guardrails to both requests and responses.
Guardrails are custom implementations created by developers, and can block or replace an incoming request, or replace an outgoing response when certain conditions are met.

Below is the implementation method and how to apply guardrails.

```python
from aiavatar.sts.llm import Guardrail, GuardrailRespose

# Define guardrails
class RequestGuardrail(Guardrail):
    async def apply(self, context_id, user_id, text, files = None, system_prompt_params = None):
        if text.lower() == "problematic input":
            return GuardrailRespose(
                guardrail_name=self.name,
                is_triggered=True,
                action="block",
                text="The problematic input has been blocked."  # Immediately returns this message to the user
            )
        elif text.lower() == "hello":
            return GuardrailRespose(
                guardrail_name=self.name,
                is_triggered=True,
                action="replace",
                text="こんにちは"   # Replaces the original request text with this value
            )
        else:
            return GuardrailRespose(
                guardrail_name=self.name,
                is_triggered=False
            )

class ResponseGuardrail(Guardrail):
    async def apply(self, context_id, user_id, text, files = None, system_prompt_params = None):
        if "ramen" in text.lower():
            return GuardrailRespose(
                guardrail_name=self.name,
                is_triggered=True,
                action="replace",
                text="The problematic output has been blocked." # Emits an additional replacement chunk for the response
            )
        else:
            return GuardrailRespose(
                guardrail_name=self.name,
                is_triggered=False
            )

# Apply guardrails
service.guardrails.append(RequestGuardrail(applies_to="request"))
service.guardrails.append(ResponseGuardrail(applies_to="response"))
```

**NOTE:** When multiple guardrails are defined, they run in parallel.
Processing stops when all guardrails have finished evaluating or when the first guardrail returns a response with `is_triggered=True`.

**NOTE:** Response guardrails are evaluated only after the LLM response stream finishes.
This means the problematic output may be briefly visible to the user.
When a response is received with `metadata.is_guardrail_triggered = true`, the client should handle this by replacing or modifying the output accordingly.


## 🌎 Platform Guide

AIAvatarKit is capable of operating on any platform that allows applications to hook into audio input and output. The platforms that have been tested include:

- VRChat
- cluster
- Vket Cloud

In addition to running on PCs to operate AI avatars on these platforms, you can also create a communication robot by connecting speakers, a microphone, and, if possible, a display to a Raspberry Pi.

### 🐈 VRChat

* __2 Virtual audio devices (e.g. VB-CABLE) are required.__
* __Multiple VRChat accounts are required to chat with your AIAvatar.__


#### Get started

First, run the commands below in python interpreter to check the audio devices.

```sh
$ python

>>> from aiavatar import AudioDevice
>>> AudioDevice().list_audio_devices()
0: Headset Microphone (Oculus Virt
    :
6: CABLE-B Output (VB-Audio Cable
7: Microsoft サウンド マッパー - Output
8: SONY TV (NVIDIA High Definition
    :
13: CABLE-A Input (VB-Audio Cable A
    :
```

In this example,

- To use `VB-Cable-A` for microphone for VRChat, index for `output_device` is `13` (CABLE-A Input).
- To use `VB-Cable-B` for speaker for VRChat, index for `input_device` is `6` (CABLE-B Output). Don't forget to set `VB-Cable-B Input` as the default output device of Windows OS.

Then edit `run.py` like below.

```python
# Create AIAvatar
aiavatar_app = AIAvatar(
    openai_api_key=OPENAI_API_KEY,
    input_device=6,     # Listen sound from VRChat
    output_device=13,   # Speak to VRChat microphone
)
```

Run it.

```bash
$ run.py
```

Launch VRChat as desktop mode on the machine that runs `run.py` and log in with the account for AIAvatar. Then set `VB-Cable-A` to microphone in VRChat setting window.

That's all! Let's chat with the AIAvatar. Log in to VRChat on another machine (or Quest) and go to the world the AIAvatar is in.


#### Face Expression

AIAvatarKit controls the face expression by [Avatar OSC](https://docs.vrchat.com/docs/osc-avatar-parameters).

LLM(ChatGPT/Claude/Gemini)
↓ *response with face tag* `[face:joy]Hello!` or `<face name="joy" />Hello!`
AIAvatarKit(VRCFaceExpressionController)  
↓ *osc* `FaceOSC=1`  
VRChat(FX AnimatorController)  
↓  
😆

So at first, setup your avatar the following steps:

1. Add avatar parameter `FaceOSC` (type: int, default value: 0, saved: false, synced: true).
1. Add `FaceOSC` parameter to the FX animator controller.
1. Add layer and put states and transitions for face expression to the FX animator controller.
1. (option) If you use the avatar that is already used in VRChat, add input parameter configuration to avatar json.


Next, use `VRChatFaceController`.

```python
from aiavatar.face.vrchat import VRChatFaceController

# Setup VRChatFaceContorller
vrc_face_controller = VRChatFaceController(
    faces={
        "neutral": 0,   # always set `neutral: 0`

        # key = the name that LLM can understand the expression
        # value = FaceOSC value that is set to the transition on the FX animator controller
        "joy": 1,
        "angry": 2,
        "sorrow": 3,
        "fun": 4
    }
)
```

Lastly, add face expression section to the system prompt.

```python
# Make system prompt
system_prompt = """
# Face Expression

* You have following expressions:

- joy
- angry
- sorrow
- fun

* If you want to express a particular emotion, please insert it at the beginning of the sentence like [face:joy].

Example
[face:joy]Hey, you can see the ocean! [face:fun]Let's go swimming.
"""

# Set them to AIAvatar
aiavatar_app = AIAvatar(
    openai_api_key=OPENAI_API_KEY,
    face_controller=vrc_face_controller,
    system_prompt=system_prompt
)
```

You can test it not only through the voice conversation but also via the [REST API](#-restful-apis).


### 🍓 Raspberry Pi

Now writing... ✍️


## ⚙️ Administration

AIAvatarKit provides a built-in admin panel for monitoring, controlling, and evaluating your AI avatar from a web browser.

### Admin Panel

Set up the admin panel with a single function call. Once configured, access it at `/admin` on your server.

```python
from aiavatar.admin import setup_admin_panel

setup_admin_panel(
    app,
    adapter=aiavatar_app,
    evaluator=evaluator,                    # Optional: If omitted, the pipeline LLM settings are used
    character_service=character_service,    # Optional: If using CharacterService
    character_id=YOUR_CHARACTER_ID,         # Optional: Required if character_service is set
    api_key="your-api-key"                  # Optional: If omitted, no authentication is required
)
```

The admin panel includes:

- **Metrics** — Real-time performance metrics for the STS pipeline
- **Logs** — Conversation logs with voice playback
- **Control** — Send speech and conversation messages to the avatar
- **Config** — Adjust pipeline, VAD, STT, LLM, TTS, and adapter settings at runtime
- **Evaluation** — Run dialog evaluation scenarios
- **Character** — Manage character info, weekly schedule, daily schedules, diaries, and users (requires `character_service`)

To protect the admin panel with Basic authentication:

```python
setup_admin_panel(
    app,
    adapter=aiavatar_app,
    api_key="your-api-key",
    basic_auth_username="admin",
    basic_auth_password="your-password",
)
```

You can also supply your own HTML to fully customize the admin page:

```python
custom_html = open("my_admin.html").read()

setup_admin_panel(
    app,
    adapter=aiavatar_app,
    html=custom_html,       # Use your own HTML instead of the built-in template
)
```

### REST API

All admin panel features are also available as REST API endpoints. See the interactive API documentation at `/docs` on your server for full details on request/response schemas.

### 📈 Observability

You can monitor the entire sequence - what requests are sent to the LLM, how they are interpreted, which tools are invoked, and what responses are generated from specific results or data - to support AIAvatar quality improvements and governance.

Since AIAvatarKit lets you replace the OpenAI client module with an alternative, you can leverage that capability to integrate with [Langfuse](https://langfuse.com).

```sh
pip install langfuse
```

```sh
export LANGFUSE_SECRET_KEY=sk-lf-XXXXXXXX
export LANGFUSE_PUBLIC_KEY=pk-lf-XXXXXXXX
export LANGFUSE_HOST=http://localhost:3000
```

```python
from langfuse.openai import openai as langfuse_openai
llm = ChatGPTService(
    openai_api_key=OPENAI_API_KEY,
    system_prompt="You are a helpful assistant.",
    model="gpt-4.1",
    custom_openai_module=langfuse_openai,   # Set langfuse OpenAI compatible client module
)
```


## 🦜 AI Agent

AIAvatarKit is not just a framework for creating chatty AI characters — it is designed to support agentic characters that can interact with APIs and external data sources (RAG).

### ⚡️ Tool Call

Register tool with spec by `@aiavatar_app.sts.llm.tool`. The spec should be in the format for each LLM.

```python
# Spec (for ChatGPT)
weather_tool_spec = {
    "type": "function",
    "function": {
        "name": "get_weather",
        "parameters": {
            "type": "object",
            "properties": {
                "location": {"type": "string"}
            },
        },
    }
}

# Implement tool and register it with spec
@aiavatar_app.sts.llm.tool(weather_tool_spec)
async def get_weather(location: str = None):
    weather = await weather_api(location=location)  # Call weather API
    return weather  # {"weather": "clear", "temperature": 23.4}
```

Alternatively, register the same tool programmatically:

```python
aiavatar_app.sts.llm.add_tool(
    Tool("get_weather", weather_tool_spec, get_weather)
)
```

**Note**: When you register a tool with `add_tool`, the spec is automatically converted to the correct format for GPT, Gemini, or Claude, so you can define it once and use it everywhere.


Before creating your own tools, start with the example tools:

```python
# Google Search
from examples.tools.gemini_websearch import GeminiWebSearchTool
aiavatar_app.sts.llm.add_tool(GeminiWebSearchTool(gemini_api_key=GEMINI_API_KEY))

# Web Scraper
from examples.tools.webscraper import WebScraperTool
aiavatar_app.sts.llm.add_tool(WebScraperTool())
```


### ⌛️ Tool Call with Streaming Progress

Sometimes you may want to provide feedback to the user when a tool takes time to execute. AIAvatarKit supports tools that return stream responses (via `AsyncGenerator`), which allows you to integrate advanced and costly operations — such as interactions with AI Agent frameworks — into real-time voice conversations without compromising the user experience.

Here’s an example implementation. Intermediate progress is yielded with the second return value set to `False`, and the final result is yielded with `True`.

```python
@service.tool(weather_tool_spec)
async def get_weather_stream(location: str):
    # Progress: Geocoding
    yield {"message": "Resolving location"}, False
    geocode = await geocode_api(location=location)

    # Progress: Weather
    yield {"message": "Calling weather api"}, False
    weather = await weather_api(geocode=geocode)  # Call weather API

    # Final result (yield with `True`)
    yield {"weather": "clear", "temperature": 23.4}, True
```

On the user side, the first value in each yield will be streamed as a `progress` response under the `ToolCall` response type.

Additionally, you can yield string values directly to provide immediate voice feedback to the user during processing:

```python
@service.tool(weather_tool_spec)
async def get_weather_stream(location: str):
    # Provide voice feedback during processing
    yield "Converting locaton to geo code. Please wait a moment."
    geocode = await geocode_api(location=location)
    
    yield "Getting weather information."
    weather = await weather_api(geocode=geocode)
    
    # Final result
    yield {"weather": "clear", "temperature": 23.4}, True
```

When you yield a string (str) value, the AI avatar will speak that text while continuing to process the request.


### 🔄 Background Tool Execution

For tools that take a long time to complete (e.g., AI agent calls, complex API orchestrations), AIAvatarKit supports **background execution**. Instead of blocking the conversation, the avatar immediately acknowledges the request and notifies the user when the result is ready via a callback.

To enable background execution, register an `on_completed` callback on the tool. This is the only requirement — the base `Tool` class handles task management, `task_id` generation, and metadata tracking automatically.

```python
from aiavatar.sts.llm import Tool

# Define tool as usual
heavy_task_spec = {
    "type": "function",
    "function": {
        "name": "run_heavy_task",
        "parameters": {
            "type": "object",
            "properties": {
                "query": {"type": "string"}
            },
            "required": ["query"]
        },
    }
}

async def run_heavy_task(query: str, metadata: dict = None):
    result = await some_slow_api(query)  # Takes a long time
    return {"answer": result}

tool = Tool("run_heavy_task", heavy_task_spec, run_heavy_task)

# Enable background execution by registering on_completed callback
@tool.on_completed
async def on_completed(result, metadata):
    # result: return value from the tool function (or None on error)
    # metadata: dict containing task_id, user_id, context_id, session_id, channel, submitted_at, arguments, etc.
    answer = result["answer"]
    user_id = metadata["user_id"]
    context_id = metadata["context_id"]
    session_id = metadata["session_id"]

    async for resp in aiavatar_app.sts.invoke(
        STSRequest(
            session_id=session_id,
            user_id=user_id,
            context_id=context_id,
            text=f"Here is the result of the task:\n\n{answer}",
            wait_in_queue=True,
            skip_quick_response=True,
        )
    ):
        await aiavatar_app.handle_response(resp)

llm.add_tool(tool)
```

When background execution is enabled:

1. The tool function is called and runs in the background as an `asyncio.Task`
2. The avatar immediately responds with `immediate_message` (customizable) and a `task_id`
3. When the function completes, `on_completed` is called with the result and metadata

You can customize the immediate message:

```python
tool = Tool(
    "run_heavy_task", heavy_task_spec, run_heavy_task,
    immediate_message="Got it! I'll work on that and let you know when it's done."
)
```

Optionally, register an `on_submitted` callback to be notified when the task is accepted:

```python
@tool.on_submitted
async def on_submitted(task_id, metadata):
    print(f"Task {task_id} submitted")
```

#### Background Timeout (Hybrid Mode)

Sometimes a tool *might* complete quickly but *could* take a long time. With `background_timeout`, AIAvatarKit tries synchronous execution first and falls back to background execution only if the timeout is exceeded.

```python
tool = Tool(
    "run_task", task_spec, run_task,
    background_timeout=3.0  # Try sync for 3 seconds, then go background
)

@tool.on_completed
async def on_completed(result, metadata):
    # Called only when the task didn't complete within the timeout
    print(f"Background result: {result}")
```

- If the tool completes within `background_timeout` seconds → result is returned directly (same as synchronous mode)
- If the tool exceeds the timeout → switches to background mode, returns `immediate_message`, and calls `on_completed` when done

**Note**: `on_completed` (background execution) and `AsyncGenerator` (streaming progress) are mutually exclusive. A tool should use one pattern or the other.


### 📋 Tool Response Formatter (Direct Response)

By default, after a tool executes, the result is passed back to the LLM to generate a human-friendly response (2nd LLM call). However, in some cases you may want to **bypass the LLM and speak the tool result directly**:

- **Accuracy**: Critical information (e.g., order details, reservation IDs) that must not be paraphrased or hallucinated
- **Latency**: Eliminating the 2nd LLM call for faster response times

Use the `@response_formatter` decorator to define a function that converts the tool result into the exact text to speak. When a `response_formatter` is set, the 2nd LLM call is skipped entirely, and the formatted text is spoken directly.

```python
@llm.tool(weather_tool_spec)
async def get_weather(location: str = None):
    weather = await weather_api(location=location)
    return weather  # {"weather": "clear", "temperature": 23.4}

# Register response_formatter to speak the result directly
@llm.tools["get_weather"].response_formatter
def format_weather(result, arguments):
    return f"The weather in {arguments['location']} is {result['weather']}, with a temperature of {result['temperature']} degrees."
```

The formatter receives two arguments:

| Argument | Description |
|----------|-------------|
| `result` | The dict returned by the tool function |
| `arguments` | The dict of arguments passed to the tool by the LLM |

The tool call and its result are still saved to conversation context, so follow-up questions like "What was the temperature again?" work naturally. The formatted text is stored as the assistant's response.

**Note**: Tools without a `response_formatter` continue to work as before (2nd LLM call generates the response). You can mix both patterns: some tools with formatters and others without.

#### Continuing Tool Chains with `continue_chain`

By default, `response_formatter` terminates the tool chain. No further LLM call is made, which maximizes speed. However, if the LLM calls multiple tools in sequence (e.g., check balance first, then fetch campaign info), a direct-response tool would break the chain and prevent subsequent tools from being called.

Use `continue_chain=True` to allow the chain to continue after the direct response:

```python
@llm.tools["get_balance"].response_formatter(continue_chain=True)
def format_balance(result, arguments):
    return f"Your balance is {result['balance']:,} {result['currency']}."
```

| Decorator | Behavior |
|-----------|----------|
| `@tool.response_formatter` | Direct response, **chain stops** (default, fastest) |
| `@tool.response_formatter(continue_chain=True)` | Direct response, **chain continues** (LLM can call more tools) |

When `continue_chain=True`, the formatted text is spoken immediately, and the tool result is also sent back to the LLM so it can decide whether to call additional tools. The LLM's text response for this round is suppressed to avoid duplication, but any subsequent tool calls and their responses proceed normally.


### 📦 Structured Content (Client-side Data)

By default, tool results (`data`) are passed back to the LLM as context. If you also want to send **structured data directly to the client application** (e.g., for rendering UI components, displaying charts, or updating app state), use `structured_content` in `ToolCallResult`.

```python
from aiavatar.sts.llm import ToolCallResult

@llm.tool(weather_tool_spec)
async def get_weather(location: str):
    weather = await weather_api(location)
    return ToolCallResult(
        data={"summary": f"{weather['temperature']}°C, {weather['condition']}"},  # → passed to LLM
        structured_content={"temperature": weather["temperature"], "condition": weather["condition"], "forecast": weather["forecast"]}  # → passed to client
    )
```

`structured_content` propagates through the entire response pipeline (`LLMResponse` → `STSResponse` → `AIAvatarResponse`) and is delivered to the client as a **top-level field** in the JSON response:

```json
{
    "type": "tool_call",
    "structured_content": {"temperature": 23.4, "condition": "sunny", "forecast": [...]},
    "metadata": {"tool_call": {"name": "get_weather", ...}}
}
```

You can also use `structured_content` with async generators for streaming scenarios:

```python
@llm.tool(search_tool_spec)
async def search(query: str):
    yield ToolCallResult(data={"status": "searching"}, is_final=False, structured_content={"loading": True})
    results = await do_search(query)
    yield ToolCallResult(data={"results": results}, is_final=True, structured_content={"loading": False, "items": results})
```

| Field | Destination | Purpose |
|-------|-------------|---------|
| `data` | LLM (as context) | Model uses this to generate a response |
| `structured_content` | Client application | Program handles this for UI/logic |

**Note**: `structured_content` defaults to `None`. Existing tools that return plain `dict` or use shorthand return types are unaffected.


### 🪄 Dynamic Tool Call

AIAvatarKit supports **dynamic Tool Calls**.
When many tools are loaded up-front, it becomes harder to make the model behave as intended and your system instructions explode in size. With AIAvatarKit’s **Dynamic Tool Call** mechanism you load **only the tools that are actually needed at the moment**, eliminating that complexity.

The overall flow is illustrated below.

![Dynamic Tool Call Mechanism](documents/images/dynamic_tool_call.png)

#### 1. Create the tool definitions and implementations  
*(exactly the same as with ordinary tools)*

```python
# Weather
get_weather_spec = {
    "type": "function",
    "function": {
        "name": "get_weather",
        "description": "Get weather info at the specified location",
        "parameters": {
            "type": "object",
            "properties": {
                "location": {"type": "string"}
            }
        },
    }
}

async def get_weather(location: str):
    resp = await weather_api(location)
    return resp.json() # e.g. {"weather": "clear", "temperature": 23.4}

# Web Search
search_web_spec = {
    "type": "function",
    "function": {
        "name": "search_web",
        "description": "Search info from the internet websites",
        "parameters": {
            "type": "object",
            "properties": {
                "query": {"type": "string"}
            }
        },
    }
}
async def search_web(query: str) -> str:
    resp = await web_search_api(query)
    return resp.json() # e.g. {"results": [{...}]}
```

#### 2. Register the tools as dynamic in the AIAvatarKit LLM service

Setting `is_dynamic=True` tells the framework not to expose the tool by default;
AIAvatarKit will inject it only when the Trigger Detection Tool decides the tool is relevant.
You can also supply an `instruction` string that will be spliced into the system prompt on-the-fly.

```python
from aiavatar.sts.llm import Tool

llm = aiavatar_app.sts.llm

# Turn on Dynamic Tool Mode
llm.use_dynamic_tools = True

# Register as Dynamic Tools
llm.tools["get_weather"] = Tool(
    "get_weather",
    get_weather_spec,
    get_weather,
    instruction=(
        "## Use of `get_weather`\n\n"
        "Call this tool to obtain current weather or a forecast. "
        "Argument:\n"
        "- `location`: city name or geo-hash."
    ),
    is_dynamic=True,
)

llm.tools["search_web"] = Tool(
    "search_web",
    search_web_spec,
    search_web,
    instruction=(
        "## Use of `search_web`\n\n"
        "Call this tool to look up information on the public internet. "
        "Argument:\n"
        "- `query`: keywords describing what you want to find."
    ),
    is_dynamic=True,
)
```

Or, register via `add_tool`.

```python
# Difine tool without `is_dynamic` for other use cases
weather_tool = Tool("get_weather", get_weather_spec, get_weather, instruction="...")

# Register tool via `add_tool` with `is_dynamic`
llm.add_tool(weather_tool, is_dynamic=True)
```


#### 3. Tweak the system prompt so the model knows how to use tools

Append a concise “How to use external tools” section (example below).
Replace the example tools with those your application actually relies on for smoother behaviour.


```md
## Use of External Tools

When external tools, knowledge, or data are required to process a user's request, use the appropriate tools.  
The following rules **must be strictly followed** when using tools.

### Arguments

- Use only arguments that are **explicitly specified by the user** or that can be **reliably inferred from the conversation history**.
- **If information is missing**, ask the user for clarification or use other tools to retrieve the necessary data.
- **It is strictly forbidden** to use values as arguments that are not based on the conversation.

### Tool Selection

When a specialized tool is available for a specific purpose, use that tool.  
If you can use only `execute_external_tool`, use it.

Examples where external tools are needed:

- Retrieving weather information  
- Retrieving memory from past conversations  
- Searching for, playing, or otherwise controlling music  
- Performing web searches  
- Accessing real-world systems or data to provide better solutions
```

With these three steps, your AI agent stays lean—loading only what it needs—while still having immediate access to a rich arsenal of capabilities whenever they’re truly required.


#### Custom Tool Repository

By default AIAvatarKit simply hands the **entire list of dynamic tools** to the LLM and lets the model decide which ones match the current context. This approach works for a moderate number of tools, but the size of the prompt places a hard limit on how many candidates you can include.

For larger-scale systems, pair AIAvatarKit with a retrieval layer (e.g., a vector-search index) so that, out of thousands of available tools, only the handful that are truly relevant are executed.

AIAvatarKit supports this pattern through the `get_dynamic_tools` hook.
Register an async function decorated with `@llm.get_dynamic_tools`; it should return a list of **tool specification objects** for the current turn.

```python
@llm.get_dynamic_tools
async def my_get_dynamic_tools(messages: list, metadata: dict) -> list:
    # Retrieve candidate tools from your vector database (or any other store)
    tools = await search_tools_from_vector_db(messages, metadata)
    # Extract and return the spec objects (not the implementations)
    return [t.spec for t in tools]
```

### 🔌 MCP

AIAvatarKit supports tools provided as MCP.

First, install the required `FastMCP` dependency.

```sh
pip install fastmcp
```

The following steps show how to retrieve tools from MCP servers and register them to `LLMService`.

Both Streamable HTTP and standard I/O are supported. The simplest approach is shown in `mcp1` and `mcp3`, but you can also add authentication headers as in `mcp2`, filter tools to only what you need, or customize parts of the schema or execution logic.

```python
from aiavatar.sts.llm.chatgpt import ChatGPTService
llm = ChatGPTService(openai_api_key=OPENAI_API_KEY)

from aiavatar.sts.llm.tools.mcp import StreamableHttpMCP, StdioMCP

# MCP Server
mcp1 = StreamableHttpMCP(url=MCP1_URL)
mcp1.for_each_tool = llm.add_tool

# MCP Server with Auth
mcp2 = StreamableHttpMCP(url=MCP2_URL, headers={"Authorization": f"Bearer {MCP_JWT}"})
@mcp2.for_each_tool
def mcp2_tools(tool: Tool):
    # Do something here (e.g. edit schema or func)
    llm.add_tool(tool)

# MCP Server (Std I/O)
mcp3 = StdioMCP(server_script="weather.py") # supports .py and .js
mcp3.for_each_tool = llm.add_tool
```

### 🛠️ Built-in Tools

You can use the following tools out of the box 📦.

- 🔍 Web Search
    - Gemini Search
    - OpenAI Search
    - Grok Search
- 🌏 Web Scraper
- 🖼️ Image Generation
    - 🍌 Nano Banana
    - 🐓 Selfie

```python
# Web Search
from aiavatar.sts.llm.tools.gemini_websearch import GeminiWebSearchTool
google_search_tool = GeminiWebSearchTool(gemini_api_key=GEMINI_API_KEY)
llm.add_tool(google_search_tool)

from aiavatar.sts.llm.tools.openai_websearch import OpenAIWebSearchTool
web_search_tool = OpenAIWebSearchTool(openai_api_key=OPENAI_API_KEY)
llm.add_tool(web_search_tool)

from aiavatar.sts.llm.tools.grok_search import GrokSearchTool
grok_web_search_tool = GrokSearchTool(xai_api_key=XAI_API_KEY)
llm.add_tool(grok_web_search_tool)

# Web Scraper
from aiavatar.sts.llm.tools.webscraper import WebScraperTool
webscraper_tool = WebScraperTool()
# webscraper_tool = WebScraperTool(openai_api_key=OPENAI_API_KEY, return_summary=True)  # Provides summary instead of full innerText (recommended)
llm.add_tool(webscraper_tool)

# Image Generation
from aiavatar.sts.llm.tools.nanobanana import NanoBananaTool
nanobanana_tool = NanoBananaTool(gemini_api_key=GEMINI_API_KEY)
llm.add_tool(nanobanana_tool)

from aiavatar.sts.llm.tools.nanobanana import NanoBananaSelfieTool
selfie_tool = NanoBananaSelfieTool(gemini_api_key=GEMINI_API_KEY, reference_image=image_bytes_or_image_url_of_file_api)
llm.add_tool(selfie_tool)
```


### 🦞 OpenClaw / Hermes

`OpenClawTool` integrates [OpenClaw](https://openclaw.ai) or [Hermes](https://github.com/nousresearch/hermes-agent), versatile AI agents, as a tool for your avatar. When the LLM determines that the user's request requires autonomous task execution (web search, data analysis, code execution, etc.), it delegates the task to the agent.

```python
from aiavatar.sts.llm.tools.openclaw_tool import OpenClawTool

# OpenClaw
openclaw_tool = OpenClawTool(
    openclaw_api_key=OPENCLAW_API_KEY,
    openclaw_base_url=OPENCLAW_BASE_URL,
    openclaw_session_key="agent:main:main",  # Set if you want to use a fixed session
    debug=True
)

# Hermes
openclaw_tool = OpenClawTool(
    openclaw_api_key=HERMES_API_KEY,
    openclaw_base_url=HERMES_BASE_URL,
    openclaw_session_key_key="X-Hermes-Session-Id",
    debug=True
)

llm.add_tool(openclaw_tool)
```

When `stream=True` is set, you can monitor the agent's intermediate steps (tool usage, code execution, etc.) via the `on_stream_chunk` handler:

```python
openclaw_tool = OpenClawTool(
    openclaw_api_key=OPENCLAW_API_KEY,
    openclaw_base_url=OPENCLAW_BASE_URL,
    stream=True
)

@openclaw_tool.on_stream_chunk
async def handle_chunk(chunk):
    if chunk.tool:
        print(f"[{chunk.emoji}] {chunk.tool}: {chunk.label}")
```

When `on_completed` is registered, OpenClaw runs asynchronously in the background — the avatar immediately acknowledges the request and notifies the user when the result is ready. The approach for delivering the result depends on your adapter.

#### Push-based delivery (WebSocket / Local)

For adapters that support server-initiated messages, use `on_completed` to push the result back through the pipeline:

```python
@openclaw_tool.on_completed
async def on_completed(result, metadata):
    answer = result["answer"]
    user_id = metadata["user_id"]
    context_id = metadata["context_id"]
    session_id = metadata["session_id"]

    async for resp in aiavatar_app.sts.invoke(
        STSRequest(
            session_id=session_id,
            user_id=user_id,
            context_id=context_id,
            text=f"$OpenClaw has returned a response. Please relay the following to the user:\n\n{answer}",
            wait_in_queue=True,
            skip_quick_response=True,
        )
    ):
        await aiavatar_app.handle_response(resp)
```

#### Polling-based delivery (HTTP)

For HTTP adapters where the SSE stream has already closed by the time the background task completes, store results in a buffer and let the client poll for them. The tool returns a `task_id` in its response for this purpose.

Register callbacks to track task lifecycle:

```python
import time as time_module
task_results = {}
TASK_TIMEOUT = 300  # 5 minutes

@openclaw_tool.on_submitted
async def on_submitted(task_id: str, metadata: dict):
    task_results[task_id] = {
        "task_id": task_id,
        "submitted_at": metadata.get("submitted_at", time_module.time()),
        "answer": None,
    }

@openclaw_tool.on_completed
async def on_completed(result, metadata):
    task_id = metadata["task_id"]
    task_results[task_id]["answer"] = result["answer"]
```

Add a polling endpoint for the client to retrieve results:

```python
@app.get("/tasks/{task_id}")
async def get_task_result(task_id: str):
    result = task_results.get(task_id)
    if result is None:
        return Response(status_code=204)
    if result["answer"]:
        task_results.pop(task_id, None)
        return {"task_id": task_id, "answer": result["answer"], "status": "completed"}
    if time_module.time() - result["submitted_at"] > TASK_TIMEOUT:
        task_results.pop(task_id, None)
        return {"task_id": task_id, "answer": None, "status": "timeout"}
    return Response(status_code=204)
```

The client receives the `task_id` from the avatar's immediate response and polls `GET /tasks/{task_id}` until it gets a result (`status: "completed"`) or a timeout (`status: "timeout"`). A `204` response means the task is still in progress.

Once the client retrieves the answer, it can send it back to the avatar as a new request, for example `f"$OpenClaw has returned a response. Please relay the following to the user:\n\n{answer}"`, to have the avatar speak the result aloud.

#### Progress tracking

When OpenClaw runs asynchronously, users may ask "How's it going?" before the task completes. The built-in progress tracking lets the avatar answer with real-time status.

`OpenClawTool` automatically tracks running tasks and, when `stream=True`, updates progress with the agent's intermediate steps (tool calls, labels, etc.) as they stream in.

Register the check tool alongside the main tool:

```python
openclaw_tool = OpenClawTool(
    openclaw_api_key=OPENCLAW_API_KEY,
    openclaw_base_url=OPENCLAW_BASE_URL,
    stream=True,  # Enables detailed progress from streaming chunks
)

llm.add_tool(openclaw_tool)
llm.add_tool(openclaw_tool.create_check_tool())
```

That's it. When the user asks about progress, the LLM calls `check_running_openclaw_tasks` and gets the current status:

```json
{
  "running_tasks": [
    {
      "request": "Search for the latest news about AI",
      "progress": "Start processing...\n- 🔍 web_search: searching for AI news\n- 📄 read_page: reading article\n"
    }
  ]
}
```

You can customize the tool name and description:

```python
openclaw_tool.create_check_tool(
    name="check_agent_status",
    description="Check what the AI agent is currently working on."
)
```

#### Report channel routing

By default, task results are delivered back to the same channel (WebSocket, phone, LINE, etc.) that initiated the request. You can override this by specifying a `report_channel` — either at invocation time via the tool parameter, or dynamically while the task is running.

The LLM can set the channel at invocation:

```python
# LLM calls: send_query_to_openclaw(query="...", report_channel="linebot")
```

Or change it mid-flight using the set report channel tool:

```python
llm.add_tool(openclaw_tool.create_set_report_channel_tool())
```

This allows the LLM to call `set_openclaw_report_channel(task_id="...", report_channel="sms")` while the task is running, redirecting where the result will be reported.

#### Per-user configuration

In multi-user environments, each user can connect to their own OpenClaw or Hermes instance with independent credentials. Users without a configuration will receive an error message instead of calling the API.

```python
from aiavatar.sts.llm.tools.openclaw_tool import OpenClawTool, OpenClawConfig

openclaw_tool = OpenClawTool(
    openclaw_configs={
        "user_id_1": OpenClawConfig(
            openclaw_api_key=USER1_API_KEY,
            openclaw_base_url=USER1_BASE_URL,
        ),
        "user_id_2": OpenClawConfig(
            openclaw_api_key=USER2_API_KEY,
            openclaw_base_url=USER2_HERMES_URL,
            openclaw_session_key_key="X-Hermes-Session-Id",
            openclaw_model="hermes-agent",
        ),
    },
    stream=True,
)
```

Per-user configs are merged with the tool-level defaults. Only the fields you specify are overridden. You can also manage configs at runtime:

```python
# Add or update
openclaw_tool.update_openclaw_config("user_id_3", OpenClawConfig(
    openclaw_api_key="new-key",
    openclaw_base_url="https://my-hermes.example.com",
))

# Remove (reverts to tool defaults)
openclaw_tool.delete_openclaw_config("user_id_3")
```


## 🧪 Evaluation

AIAvatarKit includes a comprehensive evaluation framework for testing and assessing AI avatar conversations. The `DialogEvaluator` enables scenario-based conversation execution with automatic evaluation capabilities.

### Features

- **Scenario Execution**: Run predefined dialog scenarios against your AI system
- **Turn-by-Turn Evaluation**: Evaluate each conversation turn against specific criteria
- **Goal Assessment**: Evaluate overall scenario objective achievement
- **Result Management**: Save, load, and display evaluation results

### Basic Usage

```python
import asyncio
from aiavatar.eval.dialog import DialogEvaluator, Scenario, Turn
from aiavatar.sts.llm.chatgpt import ChatGPTService

async def main():
    # Initialize LLM services
    llm = ChatGPTService(api_key="your_api_key")
    evaluation_llm = ChatGPTService(api_key="your_api_key")
    
    # Create evaluator
    evaluator = DialogEvaluator(
        llm=llm,                    # LLM for conversation
        evaluation_llm=evaluation_llm  # LLM for evaluation
    )
    
    # Define scenario
    scenario = Scenario(
        name="Order tracking support",
        goal="Provide efficient and helpful customer service for order tracking inquiries",
        turns=[
            Turn(
                input_text="Hello, I need help with my order",
                evaluation_criteria="Responds politely and shows willingness to help"
            ),
            Turn(
                input_text="My order number is 12345",
                evaluation_criteria="Acknowledges the order number and proceeds appropriately"
            )
        ]
    )
    
    # Run evaluation
    results = await evaluator.run(
        dataset=[scenario],
        detailed=True,                # Enable turn-by-turn evaluation
        overwrite_execution=False,    # Skip if already executed
        overwrite_evaluation=False    # Skip if already evaluated
    )
    
    # Display results
    evaluator.print_results(results)
    
    # Save results
    evaluator.save_results(results, "evaluation_results.json")

if __name__ == "__main__":
    asyncio.run(main())
```

Example Output:

```
=== Scenario 1 ===
Goal: Provide helpful customer support

Turn 1:
  Input: Hello, I need help with my order
  Actual Output: Hello! I'd be happy to help you with your order. Could you please provide your order number?
  Result: ✓ PASS
  Reason: The response is polite, helpful, and appropriately asks for the order number.

Turn 2:
  Input: My order number is 12345
  Actual Output: Thank you for providing order number 12345. Let me look that up for you.
  Result: ✓ PASS
  Reason: Acknowledges the order number and shows willingness to help.

Summary: 2/2 turns passed (100.0%)

=== Overall Scenario Evaluation ===
Goal Achievement: ✓ SUCCESS
Reason: The AI successfully provided helpful customer support by responding politely and efficiently handling the order inquiry.
```

### File-Based Evaluation

Load scenarios from JSON files:

```json
{
  "scenarios": [
    {
      "goal": "Basic greeting and assistance",
      "turns": [
        {
          "input_text": "Hello",
          "expected_output": "Friendly greeting",
          "evaluation_criteria": "Responds warmly and appropriately"
        }
      ]
    }
  ]
}
```

```python
# Load and evaluate from file
results = await evaluator.run(dataset="test_scenarios.json")

# Save results back to file
evaluator.save_results(results, "results.json")
```

### Configuration Options

```python
# Execution modes
results = await evaluator.run(
    dataset=scenarios,
    detailed=True,                # Turn-by-turn evaluation
    overwrite_execution=True,     # Re-run conversations
    overwrite_evaluation=True     # Re-evaluate results
)

# Simple mode (scenario-level evaluation only)
results = await evaluator.run(
    dataset=scenarios,
    detailed=False
)
```

### Use via Config API

You can evaluate scenario on the fly via Config API:

```python
# Make evaluator
from aiavatar.eval.dialog import DialogEvaluator
eval_llm = ChatGPTService(openai_api_key=OPENAI_API_KEY)
evaluator = DialogEvaluator(llm=aiavatar_app.sts.llm, evaluation_llm=eval_llm)

# Activate Config API
from aiavatar.admin.config import ConfigAPI
config_router = ConfigAPI(aiavatar_app.sts, evaluator=evaluator).get_router()   # Set evaluator here
app.include_router(config_router)
```

### Logic-based evaluation

In addition to LLM-based evaluation using `evaluation_criteria`, you can evaluate more explicitly using custom logic functions.

```python
# Make evaluation function(s)
def evaluate_weather_tool_call(output_text, tool_call, evaluation_criteria, result, eval_result_text):
    if tool_call is not None and tool_call.name != "get_weather":
        # Overwrite result and reason
        return False, f"Incorrect tool call: {tool_call.name}"
    else:
        # Pass through
        return result, eval_result_text

# Register evaluation function(s)
evaluator = DialogEvaluator(
    llm=aiavatar_app.sts.llm,
    evaluation_llm=eval_llm,
    evaluation_functions={"evaluate_weather_tool_call_func": evaluate_weather_tool_call}
)

# Use evaluation function in scenario
scenario = Scenario(
    turns=[
        Turn(input_text="Hello", expected_output_text="Hi", evaluation_criteria="Greeting"),
        Turn(input_text="What is the weather in Tokyo?", expected_output_text="It's sunny.", evaluation_criteria="Answer the weather based on the result of calling get_weather tool.", evaluation_function_name="evaluate_weather_tool_call_func"),
    ],
    goal="Answer the weather in Tokyo based on the result of get_weather."
)
```


## 🤿 Deep dive

Advanced usases.


### 🐘 PostgreSQL

You can use PostgreSQL instead of the default SQLite. We strongly recommend using PostgreSQL in production environments for its scalability and performance benefits from asynchronous processing.

To use PostgreSQL, install asyncpg and create a `PostgreSQLPoolProvider` to manage the shared connection pool. Then pass it to the constructors of the components that need database access.


```sh
pip install asyncpg
```

```python
# DB_CONNECTION_STR = "postgresql://{user}:{password}@{host}:{port}/{databasename}"
DB_CONNECTION_STR = "postgresql://postgres:postgres@127.0.0.1:5432/aiavatar"

# PoolProvider
from aiavatar.database.postgres import PostgreSQLPoolProvider
pool_provider = PostgreSQLPoolProvider(
    connection_str=DB_CONNECTION_STR,
    # max_size=20,  # Max connection count (default: 20)
    # min_size=5    # Min connection count (default: 5)
)

# Character
from aiavatar.character import CharacterService
character_service = CharacterService(
    openai_api_key=OPENAI_API_KEY,
    db_pool_provider=pool_provider,     # Creates PostgreSQLCharacterRepository and PostgreSQLActivityRepository internally
)

# LLM
from aiavatar.sts.llm.context_manager.postgres import PostgreSQLContextManager
llm = ChatGPTService(
    openai_api_key=OPENAI_API_KEY,
    system_prompt=SYSTEM_PROMPT,
    context_manager=PostgreSQLContextManager(
        get_pool=pool_provider.get_pool # Set `get_pool` to PostgreSQLContextManager
    )
)

# Adapter (Create pipeline internally)
ws_app = AIAvatarWebSocketServer(
    vad=vad,
    stt=stt,
    llm=llm,
    tts=tts,
    db_pool_provider=pool_provider,     # Creates PostgreSQLSessionStateManager and PostgreSQLPerformanceRecorder internally
)
```

**NOTE**: You can also pass PostgreSQL connection settings directly to each component's constructor to manage and use individual connections separately from the shared connection pool. However, this makes it difficult to manage the total number of connections, especially when using multiple workers. We recommend using the shared pool unless you have a specific reason not to.

**NOTE**: `PerformanceRecorder` runs in a separate thread from the main thread, so it does not use the shared connection pool. Instead, it retrieves only the connection information from the PoolProvider and creates its own dedicated connection pool. It writes performance information serially as it receives it through a queue, so it basically uses only a single connection. We recommend not changing this unless you have a specific reason.


### ⚠️ LLM Error Handling

You can handle errors that occur during LLM API calls by using the `on_error` decorator. This is useful for customizing avatar responses when content filters are triggered or when API errors occur.

```python
from aiavatar.sts.llm import LLMResponse

@llm.on_error
async def on_error(llm_response: LLMResponse):
    ex = llm_response.error_info.get("exception")   # Get exception
    error_json = llm_response.error_info.get("response_json", {})   # Get response JSON from OpenAI

    # Make response
    if error_json.get("error", {}).get("code") == "content_filter":
        llm_response.text = "[face:angry]You shouldn't say that!"
        llm_response.voice_text = "You shouldn't say that!"
    else:
        llm_response.text = "[face:sorrow]An error occurred"
        llm_response.voice_text = "An error occurred"
```

**NOTE**: When an error occurs, the conversation context is not updated. This is intentional because including the programmatically overwritten response in the context may cause unexpected LLM behavior in subsequent conversations.


### 🖍️ Custom Chat Logging

Use the `print_chat` decorator to customize how user/AI conversation turns are logged.

```python
@llm.print_chat
def print_chat(role, context_id, user_id, text, files):
    if role == "user":
        logger.info(f"\033[1;32mUser:\033[0m {text}")
    else:
        think_match = re.search(r"<think>(.*?)</think>", text, re.DOTALL)
        answer_match = re.search(r"<answer>(.*?)</answer>", text, re.DOTALL)
        if think_match or answer_match:
            if think_match:
                logger.info(f"\033[3;38;5;246mThinking: {think_match.group(1).strip()}\033[0m")
            logger.info(f"\033[1;35mAI:\033[0m {answer_match.group(1).strip() if answer_match else text}")
        else:
            logger.info(f"\033[1;35mAI:\033[0m {text}")
```

**NOTE**: This example uses ANSI escape sequences optimized for console output. These escape codes will appear as noise in log files.


### 👀 Vision

AIAvatarKit captures and sends image to AI dynamically when the AI determine that vision is required to process the request. This gives "eyes" to your AIAvatar in metaverse platforms like VRChat.

```python
# Instruct vision tag in the system message
SYSTEM_PROMPT = """
## Using Vision

If you need an image to process a user's request, you can obtain it using the following methods:

- screenshot
- camera

If an image is needed to process the request, add an instruction like [vision:screenshot] to your response to request an image from the user.

By adding this instruction, the user will provide an image in their next utterance. No comments about the image itself are necessary.

Example:

user: Look! This is the sushi I had today.
assistant: [vision:screenshot] Let me take a look.
"""

# Create AIAvatar with the system prompt
aiavatar_app = AIAvatar(
    system_prompt=SYSTEM_PROMPT,
    openai_api_key=OPENAI_API_KEY
)

# Implement get_image_url
import base64
import io
import pyautogui    # pip install pyautogui
from aiavatar.device.video import VideoDevice   # pip install opencv-python
default_camera = VideoDevice(device_index=0, width=960, height=540)

@aiavatar_app.get_image_url
async def get_image_url(source: str) -> str:
    image_bytes = None

    if source == "camera":
        # Capture photo by camera
        image_bytes = await default_camera.capture_image("camera.jpg")
    elif source == "screenshot":
        # Capture screenshot
        buffered = io.BytesIO()
        image = pyautogui.screenshot(region=(0, 0, 1280, 720))
        image.save(buffered, format="PNG")
        image_bytes = buffered.getvalue()

    if image_bytes:
        # Upload and get url, or, make base64 encoded url
        b64_encoded = base64.b64encode(image_bytes).decode('utf-8')
        b64_url = f"data:image/jpeg;base64,{b64_encoded}"
        return b64_url
```

> **Note:** XML-style tag is also supported: `<vision source="screenshot" />`


### 💾 Long-term Memory

To recall information from past conversations across different contexts, a long-term memory service is used.

To store conversation history, define a function decorated with `@aiavatar_app.sts.on_finish`. To retrieve memories from the conversation history, call the search function of the long-term memory service as a tool.

Below is an example using [ChatMemory](https://github.com/uezo/chatmemory).

```python
# Create client for ChatMemory
from aiavatar.character.memory import MemoryClient
memory_client = MemoryClient(
    base_url="http://localhost:8000"
)

# Add messages to ChatMemory service
@aiavatar_app.sts.on_finish
async def on_finish(request, response):
    await memory_client.add_messages(
        character_id=YOUR_CHARACTER_ID,  # Character ID registered via CharacterService, or any value to separate memory spaces
        request=request,
        response=response
    )

# Add MemorySearchTool to recall past events, conversations, or information about the user.
from aiavatar.character.tools import MemorySearchTool
llm.add_tool(
    MemorySearchTool(
        memory_client=memory_client,
        character_id=YOUR_CHARACTER_ID,
        debug=True
    )
)
```


### 🐓 Wakeword

Set `wakewords` when instantiating `AIAvatar`. Conversation will start when the AIAvatar recognizes one of the words in this list. You can also set `wakeword_timeout`, after which the AIAvatar will return to listening for the wakeword again.

```python
aiavatar_app = AIAvatar(
    openai_api_key=OPENAI_API_KEY,
    wakewords=["Hello", "こんにちは"],
    wakeword_timeout=60,
)
```


### 📋 System Prompt Parameters

You can embed parameters into your system prompt dynamically.

First, define your `AIAvatar` instance with a system prompt containing placeholders:

```python
aiavatar_app = AIAvatar(
    openai_api_key="YOUR_OPENAI_API_KEY",
    model="gpt-4o",
    system_prompt="User's name is {name}."
)
```

When invoking, pass the parameters as a dictionary using `system_prompt_params`:

```python
aiavatar_app.sts.invoke(STSRequest(
    # (other fields omitted)
    system_prompt_params={"name": "Nekochan"}
))
```

Placeholders in the system prompt, such as `{name}`, will be replaced with the corresponding values at runtime.


### 🎛️ Inline LLM Parameters

When calling `LLMService.chat_stream` directly (outside the Speech-to-Speech pipeline), you can override model-specific parameters on a per-request basis using `inline_llm_params`.

```python
# Override model and temperature for a single call
async for chunk in llm.chat_stream(
    context_id="ctx_001",
    user_id="user_001",
    text="Hello!",
    inline_llm_params={"model": "gpt-4.1-mini", "temperature": 0.0}
):
    print(chunk.text, end="", flush=True)
```

The key-value pairs in `inline_llm_params` are merged into the underlying API call parameters, so any parameter accepted by the provider's API can be specified. The exact keys depend on the LLM service:

| Service | Example keys |
|---|---|
| ChatGPTService | `model`, `temperature`, `reasoning_effort`, ... |
| ClaudeService | `model`, `temperature`, `max_tokens`, ... |
| GeminiService | `model`, `config`, ... |
| LiteLLMService | `model`, `temperature`, ... |

For a practical example, see [Quick Response](#-quick-response) — `QuickResponder` uses `inline_llm_params` to disable tool calls and reasoning for fast first-response generation.


### ⏰ Timestamp Insertion

You can insert timestamps into requests at regular intervals. This keeps AIAvatar responses anchored to real-world time.

```python
aiavatar_app = AIAvatar(
    vad=vad,
    stt=stt,
    llm=llm,
    tts=tts,
    timestamp_interval_seconds=600.0,   # Inserts a timestamp to the request every 600 seconds (10 minutes). Default is 0.
    timestamp_timezone="Asia/Tokyo",    # Default is 'UTC'
)
```

For example, a request of "Hello!" with timestamp insertion enabled becomes:

```
$Current date and time: 2025-12-24

Hello!
```

When `timestamp_interval_seconds` is set to 0, no timestamp is inserted (default).


### 🧵 Request merging

Request merging helps prevent conversation breakdown when speech recognition produces fragmented results. When enabled, consecutive requests within a specified time window are automatically merged into a single request, improving conversation continuity and user experience.


Example without request merging:

```
User: I'm feeling hungry...
AI: Would you... (interrupted mid-sentence while saying "Would you like me to book a restaurant? The place from last time has availability")
User: Uh-huh (misrecognized from "Um..." - a hesitant sound)
AI: Booking completed. (responded to "Uh-huh" and executed restaurant booking)
User: What are you talking about??
```

Example with request merging:

```
User: I'm feeling hungry...
AI: Would you... (interrupted mid-sentence while saying "Would you like me to book a restaurant? The place from last time has availability")
User: Uh-huh (misrecognized from "Um..." - a hesitant sound)
AI: Would you like me to book a restaurant? The place from last time has availability (responding to merged request "I'm feeling hungry... Uh-huh...")
User: Yes, please!
```

To enable this feature, set `merge_request_threshold > 0`.

```python
aiavatar_app.sts.merge_request_threshold = 2.0  # Merge requests within 2 seconds
```

You can also customize the merge prefix message. Here's an example of setting the prefix in Japanese:

```python
aiavatar_app.sts.merge_request_prefix = "$直前のユーザーの要求とあなたの応答はキャンセルされました。以下の要求に対して、あらためて応答しなおしてください:\n\n"
```

NOTE: Files from the previous request are preserved in the merged request


### 📥 Invoke Queue

AIAvatarKit provides three invoke modes for handling concurrent requests. By default, new requests interrupt any ongoing response. With queue mode enabled, you can control whether requests wait in line or still interrupt.

#### Invoke Modes

| Mode | Settings | Behavior |
|------|----------|----------|
| **Direct** (default) | `use_invoke_queue=False` | New requests immediately interrupt the current response. Suitable for most use cases. |
| **Queued (Interrupt)** | `use_invoke_queue=True`, `wait_in_queue=False` | Requests are queued but clear previous pending requests. The current response is interrupted. Default behavior when queue mode is enabled. |
| **Queued (Wait)** | `use_invoke_queue=True`, `wait_in_queue=True` | Requests wait in queue until previous ones complete. No interruption occurs. Useful when you need sequential processing, such as sending a follow-up request (e.g., with an image requested by the server) without interrupting the current response. |

#### Configuration

Enable queue mode on the pipeline:

```python
from aiavatar.sts import STSPipeline

pipeline = STSPipeline(
    # ... other settings ...
    use_invoke_queue=True,              # Enable queue mode
    invoke_queue_idle_timeout=10.0,     # Worker stops after 10s of inactivity
    invoke_timeout=60.0,                # Maximum time for a single invoke
)
```

Or on the AIAvatar instance:

```python
aiavatar_app = AIAvatar(
    openai_api_key=OPENAI_API_KEY,
    use_invoke_queue=True,
)
```

#### Per-Request Behavior

When queue mode is enabled, control per-request behavior via `wait_in_queue`:

```python
from aiavatar.sts import STSRequest

# Interrupt mode (default): clears queue and interrupts current response
request = STSRequest(
    session_id="session1",
    text="Hello!",
    wait_in_queue=False  # default
)

# Wait mode: queues and waits for previous requests to complete
request = STSRequest(
    session_id="session1",
    text="What's next?",
    wait_in_queue=True
)
```

#### Caveats

- **Python 3.11+ required**: Queue mode uses `asyncio.timeout()` which is only available in Python 3.11 and later.
- **Session-based queues**: Each session has its own independent queue. Requests from different sessions do not affect each other.
- **Do not mix modes**: The `use_invoke_queue` setting should remain consistent for a pipeline instance. Changing it at runtime is not supported.
- **Cancelled responses**: When a queued request is cleared (by a non-waiting request), it receives a response with `type="cancelled"`.


### 🧺 Shared Context

Context is typically shared only between an individual user and the AI character. With AIAvatarKit, you can manage histories that define how broadly the context is shared, for example, making it common to every user.

This lets you inject context with general events that are independent of any single user interaction, such as public news or actions the AI character has taken.

```python
# Add character-wide shared messages identified by context_id="shared_context_id"
now = datetime.now(ZoneInfo(self.timezone))
await self.llm.context_manager.add_histories(
    context_id="shared_context_id",
    data_list=[
        {
            "role": "user",
            "content": f"$Current datetime: {now.strftime('%Y/%m/%d %H:%M:%S')}\nToday's news: {news}"
        },
        {
            "role": "assistant",
            "content": "I recognized current datetime and today's news."
        },
    ],
    context_schema="chatgpt"
)
```

```python
# Pass "shared_context_id" via `shared_context_ids` to load the shared history
llm = ChatGPTService(
    openai_api_key=OPENAI_API_KEY,
    system_prompt="You are a helpful virtual assistant.",
    model="gpt-4.1",
    shared_context_id=["shared_context_id"]
)
```


### 🔗 Channel Context Bridge

`ChannelContextBridge` maps channel-specific user IDs (e.g. LINE user ID, Twilio phone number) to app-level user IDs and persists conversation context (`context_id`) per user across channels. This is essential when some channels (e.g. Twilio) cannot pass `context_id` from the client side.

It manages two separate concerns:
- **Channel Users**: keyed by `(channel_id, channel_user_id)`, maps to an app-level `user_id` with arbitrary `data`.
- **User Contexts**: keyed by `user_id`, stores `context_id` with automatic expiry based on `timeout` (default: 3600 seconds).

The LINE adapter uses `ChannelContextBridge` internally. For WebSocket or other adapters, use `bind()` to automatically sync context via adapter hooks:

```python
from aiavatar.adapter.channel_context_bridge import SQLiteChannelContextBridge

bridge = SQLiteChannelContextBridge(db_path="aiavatar.db", timeout=3600)
bridge.bind(aiavatar_app, channel_id="websocket")
```

Or register hooks manually:

```python
from aiavatar.adapter.channel_context_bridge import SQLiteChannelContextBridge, UserContext

bridge = SQLiteChannelContextBridge(db_path="aiavatar.db", timeout=3600)

@aiavatar_app.on_session_start
async def on_session_start(request, session_data):
    if not request.user_id:
        return

    channel_user = await bridge.get_channel_user("websocket", request.user_id, auto_create=True)

    # Restore application-level user_id if mapped
    if channel_user.user_id != request.user_id:
        request.user_id = channel_user.user_id

    # Restore context_id
    ctx = await bridge.get_context(request.user_id)
    if ctx and ctx.context_id:
        request.context_id = ctx.context_id

@aiavatar_app.on_response
async def on_response(response, _):
    if response.type == "start" and response.user_id and response.context_id:
        await bridge.upsert_context(UserContext(
            user_id=response.user_id,
            context_id=response.context_id,
        ))
```

**Auto-create behavior**: When `get_channel_user()` is called with `auto_create=True` and no matching record exists, a new channel user is automatically created. By default, the channel user ID is used as the app-level user ID (e.g. a LINE user ID becomes the app-level user ID). To generate a custom user ID instead, use the `create_user_id` decorator:

```python
from uuid import uuid4

@bridge.create_user_id
def create_user_id(channel_id, channel_user_id):
    return str(uuid4())
```

The function receives `(channel_id, channel_user_id)` and returns the app-level user ID to assign.

**Cross-channel context sharing**: When the same `user_id` is linked across multiple channels, they share the same `context_id`, maintaining conversation continuity. Use `link_channel_user()` to map different channel user IDs to a single app-level user.

**PostgreSQL backend**:

```python
from aiavatar.adapter.channel_context_bridge.postgres import PostgreSQLChannelContextBridge

bridge = PostgreSQLChannelContextBridge(
    host="localhost",
    port=5432,
    dbname="aiavatar",
    user="postgres",
    password="your_password",
    timeout=3600,
)
```


### 📡 Channel-aware Processing

When your AI avatar serves multiple channels (WebSocket, phone, SMS, LINE, etc.), you can make the pipeline aware of which channel each request comes from.

#### Channel Tag Insertion

Enable `insert_channel_tag` to automatically prepend a `<channel>` tag to the user's message before sending it to the LLM. This lets the LLM adjust its response style based on the channel.

```python
# Twilio adapter (channel defaults to "phone")
app = AIAvatarTwilioServer(
    channel="phone",
    insert_channel_tag=True,
)
```

With `insert_channel_tag=True`, the LLM receives input like:

```
<channel name='phone' />Hello, how are you?
```

You can instruct the LLM in the system prompt to behave differently per channel:

```
When <channel name='sms' />, keep responses short and text-friendly.
When <channel name='phone' />, use natural conversational language.
```

For voice-based adapters (WebSocket, Twilio), the channel is stored in VAD session data and automatically set on each request. For text-based adapters (LINE Bot), the channel is set directly on the request.

#### Skip TTS for Text Channels

The Twilio adapter skips TTS for SMS by default (`skip_tts_channels=["sms"]`), since text messages don't need speech synthesis.

```python
# Default: skips TTS for SMS
app = AIAvatarTwilioServer()

# Customize which channels skip TTS
app = AIAvatarTwilioServer(
    skip_tts_channels=["sms", "chat"],
)
```

In an omni-channel setup, you can also configure this on the pipeline directly:

```python
app.sts.skip_tts_channels = ["sms", "linebot"]
```


### 🔈 Audio device

You can specify the audio devices to be used in components by device index.

First, check the device indexes you want to use.

```sh
$ python

>>> from aiavatar import AudioDevice
>>> AudioDevice().list_audio_devices()
{'index': 0, 'name': '外部マイク', 'max_input_channels': 1, 'max_output_channels': 0, 'default_sample_rate': 44100.0}
{'index': 1, 'name': '外部ヘッドフォン', 'max_input_channels': 0, 'max_output_channels': 2, 'default_sample_rate': 44100.0}
{'index': 2, 'name': 'MacBook Airのマイク', 'max_input_channels': 3, 'max_output_channels': 0, 'default_sample_rate': 44100.0}
{'index': 3, 'name': 'MacBook Airのスピーカー', 'max_input_channels': 0, 'max_output_channels': 2, 'default_sample_rate': 44100.0}
```

Set indexes to AIAvatar.

```python
aiavatar_app = AIAvatar(
    input_device=2,     # MacBook Airのマイク
    output_device=3,    # MacBook Airのスピーカー
    openai_api_key=OPENAI_API_KEY
)
```


### 🐆 Quick Response

To reduce the first response latency, `QuickResponder` generates a short acknowledgment phrase (e.g. "Sure!" or "なるほど。") and sends it to the user immediately, before the main LLM response is ready. This keeps the conversation feeling responsive while the full answer is being generated.

```python
from aiavatar.sts import QuickResponder, DEFAULT_QUICK_RESPONSE_PROMPT_PREFIX_JA, DEFAULT_REQUEST_PREFIX_JA
from aiavatar.sts.models import STSRequest

quick_responder = QuickResponder(
    llm=llm,
    tts=tts,
    quick_response_prompt_prefix=DEFAULT_QUICK_RESPONSE_PROMPT_PREFIX_JA,
    request_prefix=DEFAULT_REQUEST_PREFIX_JA
)

@aiavatar_app.sts.on_before_llm
async def on_before_llm(request: STSRequest):
    await quick_responder.respond(request)
```

`QuickResponder` uses the provided LLM to generate a brief phrase and synthesizes it with the provided TTS (with caching). The generated quick response is stored in the request and yielded by the pipeline as the first chunk. It then rewrites `request.text` so the main LLM response continues naturally without repeating the quick response.

> **Note:** If the main LLM response occasionally includes the quick response content, adding few-shot examples to the initial messages can help stabilize the behavior. You can set them directly via `llm.initial_messages`, or use `CharacterLoader.format_messages` to extend the messages when using `CharacterLoader`.
>
> ```python
> @character_loader.format_messages
> def format_messages(messages):
>     messages.append({"role": "user", "content": quick_responder.quick_response_prompt_prefix + "\n\nHello!"})
>     messages.append({"role": "assistant", "content": "Hello!"})
>     messages.append({"role": "user", "content": quick_responder.request_prefix + "\n\nHello!"})
>     messages.append({"role": "assistant", "content": "<think>Respond warmly to the greeting.</think><answer>Hello! How can I help you today?</answer>"})
>     messages.append({"role": "user", "content": "You repeated 'Hello!' which was already sent. Always continue from where the previous output left off."})
>     messages.append({"role": "assistant", "content": "<think>Noted the mistake. Will not repeat already-sent text next time.</think><answer>Got it.</answer>"})
>     return messages
> ```

#### QuickResponderPro

`QuickResponderPro` is a performance-tuned variant that bypasses `LLMService` and calls the OpenAI-compatible API directly with `stream=False`. It manages its own context through a dedicated `ContextManager`, cleans conversation history for few-shot learning, and supports a custom system prompt — giving you full control over how quick responses are generated.

```python
from aiavatar.sts.quick_responder.pro import QuickResponderPro, DEFAULT_QRP_SYSTEM_PROMPT_JA
from aiavatar.sts.llm.context_manager.postgres import PostgreSQLContextManager
from aiavatar.sts.models import STSRequest

quick_responder_pro = QuickResponderPro(
    api_key="YOUR_OPENAI_API_KEY",
    model="gpt-4.1-nano",
    tts=tts,
    context_manager=PostgreSQLContextManager(get_pool=pool_provider.get_pool),
    language="ja",
    system_prompt=DEFAULT_QRP_SYSTEM_PROMPT_JA + "\n\n# Character\nYour character description here.",
    timeout=1.5,
)

@aiavatar_app.sts.on_before_llm
async def on_before_llm(request: STSRequest):
    await quick_responder_pro.respond(request)
```

**How it works:**

1. Builds messages from system prompt + cleaned history + user utterance
2. Calls the API with `stream=False` for minimum latency
3. Synthesizes the response with TTS (with caching)
4. Rewrites `request.text` with a deduplication prefix so the main LLM continues naturally

**Pre-generation during silence:** When using `SileroStreamSpeechDetector`, you can start generating the quick response during the segment silence period — before turn-end is confirmed. This overlaps LLM + TTS work with the remaining silence wait, noticeably reducing perceived latency.

```python
@vad.on_speech_detecting
async def on_speech_detecting(text, vad_session):
    await quick_responder_pro.create_generation_task(
        text,
        vad_session.session_id,
        vad_session.data.get("context_id")
    )
```

If the user resumes speaking, the pending task is automatically cancelled and a new one starts. If the user stays silent and turn-end is confirmed, `respond()` picks up the pre-generated result instead of generating from scratch.

**History cleaning:** When reading back conversation history, `QuickResponderPro` automatically cleans it for the QR context:
- **Quick response turns** (prompt_prefix) — kept as-is, serving as few-shot examples
- **Main LLM turns** (request_prefix) — replaced with a short continuation message to avoid confusing duplicate utterances
- **Assistant content** — `<think>`/`<answer>` tags and `[control:tags]` are stripped to plain text

**Azure OpenAI / Custom client:** You can pass a pre-configured client instead of `api_key`/`base_url`:

```python
from openai import AsyncAzureOpenAI

quick_responder_pro = QuickResponderPro(
    client=AsyncAzureOpenAI(
        api_key="YOUR_AZURE_API_KEY",
        api_version="2025-01-01-preview",
        azure_endpoint="https://your-resource.openai.azure.com/openai/deployments/your-deployment/chat/completions?api-version=2025-01-01-preview"
    ),
    model="your-deployment-name",
    tts=tts,
    context_manager=context_manager,
)
```

**extra_body:** For providers that require additional request parameters (e.g. disabling thinking for Claude):

```python
quick_responder_pro = QuickResponderPro(
    api_key="YOUR_ANTHROPIC_API_KEY",
    base_url="https://api.anthropic.com/v1/",
    model="claude-haiku-4-5",
    extra_body={"thinking": {"type": "disabled"}},
    tts=tts,
    context_manager=context_manager,
)
```

> **Note:** As with `QuickResponder`, adding few-shot examples to the main LLM's initial messages helps prevent the main response from repeating the quick response. Use `CharacterLoader.format_messages` or set `llm.initial_messages` directly:
>
> ```python
> @character_loader.format_messages
> def format_messages(messages):
>     messages.append({"role": "user", "content": quick_responder_pro.prompt_prefix + "\n\nHello!"})
>     messages.append({"role": "assistant", "content": f"<think>{quick_responder_pro.think_tag_content}</think><answer>Hello!</answer>"})
>     messages.append({"role": "user", "content": quick_responder_pro.request_prefix.format(quick_response_text="Hello!") + "\n\nHello!"})
>     messages.append({"role": "assistant", "content": "<think>Respond warmly to the greeting.</think><answer>How can I help you today?</answer>"})
>     messages.append({"role": "user", "content": "You repeated 'Hello!' which was already sent. Always continue from where the previous output left off."})
>     messages.append({"role": "assistant", "content": "<think>Noted the mistake. Will not repeat already-sent text next time.</think><answer>Got it.</answer>"})
>     return messages
> ```


### 🎭 Custom Behavior

You can invoke custom implementations `on_response(response_type)`. In the following example, show "thinking" face expression while processing request to enhance the interaction experience with the AI avatar.

```python
# Set face when the character is thinking the answer
@aiavatar_app.on_response("start")
async def on_start_response(response):
    await aiavatar_app.face_controller.set_face("thinking", 3.0)

# Reset face before answering
@aiavatar_app.on_response("chunk")
async def on_chunk_response(response):
    if response.metadata.get("is_first_chunk"):
        aiavatar_app.face_controller.reset()
```


### ✅ Request Validation

You can filter out unwanted requests before they reach the LLM by implementing a `validate_request` hook. Return a reason string to cancel the request, or `None` to proceed.

```python
from aiavatar.sts.models import STSRequest

@aiavatar_app.sts.validate_request
async def validate_request(request: STSRequest):
    # Reject text that is too short
    if len(request.text) < 3:
        return "Text too short"

    # Reject requests with too many files
    if request.files and len(request.files) > 5:
        return "Too many files attached"

    # Reject specific users
    if request.user_id == "blocked_user":
        return "User is blocked"

    return None  # Proceed with the request
```

This is useful for:
- Filtering out noise or accidental triggers (e.g., coughs, short utterances)
- Limiting file attachments
- Implementing user-based access control
- Any custom validation logic based on `STSRequest` fields

#### Early Validation with AzureStreamSpeechDetector

When using `AzureStreamSpeechDetector`, you can validate recognized text even earlier—before the STS pipeline is invoked. This is more efficient for filtering out short or invalid utterances since it skips the entire pipeline processing.

```python
from aiavatar.sts.vad.azure_stream import AzureStreamSpeechDetector

speech_detector = AzureStreamSpeechDetector(
    azure_subscription_key=AZURE_SUBSCRIPTION_KEY,
    azure_region=AZURE_REGION,
    azure_language="ja-JP",
)

@speech_detector.validate_recognized_text
def validate_recognized_text(text: str) -> str | None:
    # Reject text that is too short
    if len(text) < 3:
        return "Text too short"

    # Reject specific patterns (e.g., filler words)
    if text in ["えーと", "あの", "うーん"]:
        return "Filler word detected"

    return None  # Proceed with the request
```

Note: This decorator uses a synchronous function (not `async`) because it runs within the Azure Speech SDK's callback thread.


### 🎚️ Noise Filter

AIAvatarKit automatically adjusts the noise filter for listeners when you instantiate an AIAvatar object. To manually set the noise filter level for voice detection, set `auto_noise_filter_threshold` to `False` and specify the `volume_threshold_db` in decibels (dB).

```python
aiavatar_app = AIAvatar(
    openai_api_key=OPENAI_API_KEY,
    auto_noise_filter_threshold=False,
    volume_threshold_db=-40   # Set the voice detection threshold to -40 dB
)
```


### 🔄 Migration Guide: From v0.6.x to v0.7.0

In version **v0.7.0**, the internal Speech-to-Speech pipeline previously provided by the external `LiteSTS` library has been fully integrated into AIAvatarKit.

### What Changed?

- The functionality remains the same — **no API behavior changes**.
- However, **import paths have been updated**.

### 🔧 Required Changes

All imports from `litests` should now be updated to `aiavatar.sts`.

For example:

```python
# Before
from litests import STSRequest, STSResponse
from litests.llm.chatgpt import ChatGPTService

# After
from aiavatar.sts import STSRequest, STSResponse
from aiavatar.sts.llm.chatgpt import ChatGPTService
```

This change ensures compatibility with the new internal structure and removes the need for `LiteSTS` as a separate dependency.
