Metadata-Version: 2.4
Name: smooth-websocket-daemon
Version: 0.1.0
Summary: A FastAPI-based WebSocket library
Author-email: Your Name <your.email@example.com>
License-Expression: MIT
Classifier: Development Status :: 3 - Alpha
Classifier: Intended Audience :: Developers
Classifier: Programming Language :: Python :: 3
Classifier: Programming Language :: Python :: 3.8
Classifier: Programming Language :: Python :: 3.9
Classifier: Programming Language :: Python :: 3.10
Classifier: Programming Language :: Python :: 3.11
Requires-Python: >=3.8
Description-Content-Type: text/markdown
Requires-Dist: fastapi>=0.104.0
Requires-Dist: uvicorn[standard]>=0.24.0
Requires-Dist: websockets>=12.0
Provides-Extra: dev
Requires-Dist: pytest>=7.4.0; extra == "dev"
Requires-Dist: pytest-asyncio>=0.21.0; extra == "dev"
Requires-Dist: black>=23.0.0; extra == "dev"
Requires-Dist: ruff>=0.1.0; extra == "dev"

# WebSocket 消息处理框架

本框架提供了用于处理 WebSocket 消息的完整解决方案，包括消息处理器（MessageHandler）和 WebSocket 运行器（WebSocketRunner）。

## 目录

- [MessageHandler 使用说明](#messagehandler-使用说明)
- [WebSocketRunner 使用说明](#websocketrunner-使用说明)
- [完整示例](#完整示例)

## MessageHandler 使用说明

### 概述

MessageHandler 提供了一套用于处理不同类型 WebSocket 消息的处理器框架。支持两种消息类型：
- **BytesMessageHandler**: 处理二进制消息（bytes）
- **TextMessageHandler**: 处理文本消息（str）

### 基本使用

#### 1. 创建自定义处理器

继承 `BytesMessageHandler` 或 `TextMessageHandler` 并实现 `handle` 方法：

```python
from transcriber.message_handler import BytesMessageHandler, TextMessageHandler

# 处理二进制消息
class MyBytesHandler(BytesMessageHandler):
    def handle(self, data: bytes) -> None:
        # 处理二进制数据
        print(f"收到二进制数据: {data}")
        # 你的处理逻辑...

# 处理文本消息
class MyTextHandler(TextMessageHandler):
    def handle(self, data: str) -> None:
        # 处理文本数据
        print(f"收到文本消息: {data}")
        # 你的处理逻辑...
```

#### 2. 使用 MessageHandlerExecutor

`MessageHandlerExecutor` 用于管理和执行消息处理器：

```python
from transcriber.message_handler import MessageHandlerExecutor, BytesMessageHandler, TextMessageHandler

# 创建执行器
executor = MessageHandlerExecutor()

# 注册处理器
bytes_handler = MyBytesHandler()
text_handler = MyTextHandler()

executor.register_bytes_handler(bytes_handler)
executor.register_text_handler(text_handler)

# 执行消息
message_bytes = {"bytes": b"hello"}
message_text = {"text": "world"}

executor.execute(message_bytes)  # 调用 bytes_handler.handle()
executor.execute(message_text)   # 调用 text_handler.handle()
```

#### 3. 检查处理器状态

```python
# 检查是否已注册处理器
if executor.has_bytes_handler():
    print("已注册 bytes 处理器")
    
if executor.has_text_handler():
    print("已注册 text 处理器")

# 获取已注册的处理器
bytes_handler = executor.get_bytes_handler()
text_handler = executor.get_text_handler()
```

### 高级用法

#### 处理多种消息类型

可以同时注册 bytes 和 text 处理器，执行器会根据消息类型自动选择合适的处理器：

```python
executor = MessageHandlerExecutor()

class BytesProcessor(BytesMessageHandler):
    def handle(self, data: bytes) -> None:
        # 处理音频数据、图片等
        process_audio(data)

class TextProcessor(TextMessageHandler):
    def handle(self, data: str) -> None:
        # 处理 JSON、命令等文本
        process_command(data)

executor.register_bytes_handler(BytesProcessor())
executor.register_text_handler(TextProcessor())

# 执行器会自动根据消息类型选择处理器
executor.execute({"bytes": b"audio data"})  # 使用 BytesProcessor
executor.execute({"text": '{"cmd": "start"}'})  # 使用 TextProcessor
```

#### 替换处理器

可以随时替换已注册的处理器：

```python
executor.register_bytes_handler(Handler1())
executor.execute({"bytes": b"data"})  # 使用 Handler1

executor.register_bytes_handler(Handler2())
executor.execute({"bytes": b"data"})  # 现在使用 Handler2
```

### 错误处理

```python
# 如果没有注册处理器就执行消息，会抛出 ValueError
executor = MessageHandlerExecutor()
try:
    executor.execute({"bytes": b"data"})
except ValueError as e:
    print(f"错误: {e}")  # "No bytes handler registered"

# 如果消息格式不正确，也会抛出错误
try:
    executor.execute({"other": "data"})
except ValueError as e:
    print(f"错误: {e}")  # "Message must contain either 'bytes' or 'text' key"
```

## WebSocketRunner 使用说明

### 概述

`WebSocketRunner` 是一个用于处理 WebSocket 连接的消息循环运行器。它从 WebSocket 连接接收消息，并通过 `MessageHandlerExecutor` 处理这些消息。

### 基本使用

#### 1. 创建并运行 Runner

```python
from starlette.websockets import WebSocket
from transcriber.message_handler import MessageHandlerExecutor, WebSocketRunner, BytesMessageHandler, TextMessageHandler

# 创建执行器和处理器
executor = MessageHandlerExecutor()

class MyBytesHandler(BytesMessageHandler):
    def handle(self, data: bytes) -> None:
        print(f"处理二进制数据: {data}")

class MyTextHandler(TextMessageHandler):
    def handle(self, data: str) -> None:
        print(f"处理文本消息: {data}")

executor.register_bytes_handler(MyBytesHandler())
executor.register_text_handler(MyTextHandler())

# 创建 runner
runner = WebSocketRunner(executor)

# 在 WebSocket 端点中使用
async def websocket_endpoint(websocket: WebSocket):
    await websocket.accept()
    await runner.run(websocket)  # 运行消息循环
```

#### 2. 在 Starlette/FastAPI 中使用

```python
from fastapi import FastAPI, WebSocket
from transcriber.message_handler import MessageHandlerExecutor, WebSocketRunner, BytesMessageHandler

app = FastAPI()

# 创建全局执行器（或在依赖注入中创建）
executor = MessageHandlerExecutor()

class AudioHandler(BytesMessageHandler):
    def handle(self, data: bytes) -> None:
        # 处理音频数据
        process_audio_stream(data)

executor.register_bytes_handler(AudioHandler())

@app.websocket("/ws")
async def websocket_route(websocket: WebSocket):
    await websocket.accept()
    
    # 创建 runner 并运行
    runner = WebSocketRunner(executor)
    await runner.run(websocket)
```

#### 3. 获取执行器

```python
runner = WebSocketRunner(executor)
retrieved_executor = runner.get_executor()
assert retrieved_executor == executor
```

### 工作原理

1. **消息接收**: `run()` 方法持续从 WebSocket 接收消息
2. **消息过滤**: 只处理包含 `bytes` 或 `text` 键的消息，其他消息会被跳过
3. **消息执行**: 将消息传递给 `MessageHandlerExecutor` 执行
4. **连接关闭**: 当 `WebSocketDisconnect` 异常发生时，循环正常退出
5. **错误传播**: 其他异常会被重新抛出，由调用者处理

### 注意事项

- **消息格式**: Runner 只处理包含 `bytes` 或 `text` 键的消息
- **连接断开**: `WebSocketDisconnect` 会被正常处理，不会抛出异常
- **异常处理**: 执行器中的异常会向上传播，需要在业务逻辑中处理
- **异步运行**: `run()` 方法是异步的，必须在 `async` 函数中调用

### 错误处理

```python
async def websocket_endpoint(websocket: WebSocket):
    await websocket.accept()
    
    try:
        runner = WebSocketRunner(executor)
        await runner.run(websocket)
    except ValueError as e:
        # 处理执行器错误（如未注册处理器）
        print(f"执行错误: {e}")
    except Exception as e:
        # 处理其他错误
        print(f"未知错误: {e}")
```

## 完整示例

### 示例 1: 简单的文本消息处理

```python
from fastapi import FastAPI, WebSocket
from transcriber.message_handler import (
    MessageHandlerExecutor,
    WebSocketRunner,
    TextMessageHandler
)

app = FastAPI()
executor = MessageHandlerExecutor()

class EchoHandler(TextMessageHandler):
    def handle(self, data: str) -> None:
        print(f"收到消息: {data}")

executor.register_text_handler(EchoHandler())

@app.websocket("/ws")
async def websocket_route(websocket: WebSocket):
    await websocket.accept()
    runner = WebSocketRunner(executor)
    await runner.run(websocket)
```

### 示例 2: 同时处理文本和二进制消息

```python
from fastapi import FastAPI, WebSocket
from transcriber.message_handler import (
    MessageHandlerExecutor,
    WebSocketRunner,
    BytesMessageHandler,
    TextMessageHandler
)

app = FastAPI()
executor = MessageHandlerExecutor()

class CommandHandler(TextMessageHandler):
    def handle(self, data: str) -> None:
        # 处理 JSON 命令
        import json
        cmd = json.loads(data)
        print(f"执行命令: {cmd}")

class AudioHandler(BytesMessageHandler):
    def handle(self, data: bytes) -> None:
        # 处理音频流
        print(f"收到音频数据: {len(data)} 字节")

executor.register_text_handler(CommandHandler())
executor.register_bytes_handler(AudioHandler())

@app.websocket("/ws")
async def websocket_route(websocket: WebSocket):
    await websocket.accept()
    runner = WebSocketRunner(executor)
    await runner.run(websocket)
```

### 示例 3: 动态处理器管理

```python
from transcriber.message_handler import MessageHandlerExecutor, WebSocketRunner

executor = MessageHandlerExecutor()

# 根据业务需求动态注册处理器
def setup_handlers(mode: str):
    if mode == "audio":
        executor.register_bytes_handler(AudioHandler())
    elif mode == "text":
        executor.register_text_handler(TextHandler())
    else:
        executor.register_bytes_handler(AudioHandler())
        executor.register_text_handler(TextHandler())

# 使用
setup_handlers("audio")
runner = WebSocketRunner(executor)
```

## API 参考

### MessageHandlerExecutor

- `register_bytes_handler(handler: BytesMessageHandler)`: 注册二进制消息处理器
- `register_text_handler(handler: TextMessageHandler)`: 注册文本消息处理器
- `execute(message: Message)`: 执行消息处理
- `has_bytes_handler() -> bool`: 检查是否已注册二进制处理器
- `has_text_handler() -> bool`: 检查是否已注册文本处理器
- `get_bytes_handler() -> Optional[BytesMessageHandler]`: 获取二进制处理器
- `get_text_handler() -> Optional[TextMessageHandler]`: 获取文本处理器

### WebSocketRunner

- `__init__(executor: MessageHandlerExecutor)`: 初始化 runner
- `run(websocket: WebSocket) -> None`: 运行消息处理循环（异步）
- `get_executor() -> MessageHandlerExecutor`: 获取关联的执行器

### BytesMessageHandler / TextMessageHandler

- `handle(data: bytes | str) -> None`: 处理消息数据（需要子类实现）
- `handle_message(message: Message) -> None`: 从消息字典中提取数据并处理

