Coverage for agentos/channels/base.py: 0%
63 statements
« prev ^ index » next coverage.py v7.14.3, created at 2026-07-02 09:59 +0800
« prev ^ index » next coverage.py v7.14.3, created at 2026-07-02 09:59 +0800
1"""
2AgentOS Channels — 基础适配器协议。
4所有渠道适配器均实现本协议,确保 MessageGateway 零差异调用。
5"""
7from __future__ import annotations
9import hashlib
10import hmac
11import json
12import time
13from abc import ABC, abstractmethod
14from dataclasses import dataclass, field
15from typing import Optional, Callable, Awaitable
17from agentos.channels.message import ChannelMessage, ChannelType, ConversationContext
20@dataclass
21class ChannelConfig:
22 """渠道配置。"""
23 channel: ChannelType
24 enabled: bool = False
25 webhook_path: str = "" # 接收 webhook 的 URL 路径
26 webhook_port: int = 8080
27 verify_token: str = "" # 签名校验 token
28 app_id: str = ""
29 app_secret: str = ""
30 encoding_aes_key: str = "" # 加解密 key(微信系)
31 corp_id: str = ""
32 agent_id: str = ""
33 bot_app_id: str = ""
34 bot_token: str = ""
35 bot_secret: str = ""
36 extra: dict = field(default_factory=dict)
38 def to_dict(self) -> dict:
39 return {
40 "channel": self.channel.value,
41 "enabled": self.enabled,
42 "webhook_path": self.webhook_path,
43 "webhook_port": self.webhook_port,
44 "app_id": self.app_id,
45 }
48@dataclass
49class ReplyResult:
50 """回复结果。"""
51 success: bool
52 msg_id: str = ""
53 error: str = ""
56CallbackType = Callable[[ChannelMessage], Awaitable[Optional[str]]]
57"""消息回调: 接收 ChannelMessage,返回可选的同步回复文本。"""
60class BaseChannelAdapter(ABC):
61 """渠道适配器基类。
63 每个渠道适配器负责:
64 1. 接收 webhook → 验证签名 → 解析为 ChannelMessage
65 2. 将 Agent Engine 的回复发送回渠道
66 3. Token 管理与自动续期
67 """
69 channel_type: ChannelType
70 config: ChannelConfig
71 _on_message: Optional[CallbackType] = None
73 def __init__(self, config: ChannelConfig):
74 self.config = config
76 def set_callback(self, cb: CallbackType):
77 """设置收到消息时的回调。"""
78 self._on_message = cb
80 # ── Webhook 入口 ──
82 @abstractmethod
83 def verify_signature(self, raw_body: bytes, headers: dict) -> bool:
84 """验证 webhook 签名。返回 True 表示验证通过。"""
85 ...
87 @abstractmethod
88 def parse_webhook(self, raw_body: bytes, headers: dict) -> ChannelMessage | list[ChannelMessage]:
89 """解析 webhook 原始报文为 ChannelMessage(s)。"""
90 ...
92 # ── 被动回复(同步,在 webhook 响应中返回)──
94 @abstractmethod
95 def build_reply(self, msg: ChannelMessage, reply_text: str) -> str:
96 """构建被动回复报文(xml/json)。"""
97 ...
99 # ── 主动推送(异步,通过 API 发送)──
101 @abstractmethod
102 async def send_message(self, user_id: str, content: str, msg_type: str = "text") -> ReplyResult:
103 """主动推送消息到用户。"""
104 ...
106 @abstractmethod
107 async def send_image(self, user_id: str, image_url: str) -> ReplyResult:
108 """推送图片。"""
109 ...
111 @abstractmethod
112 async def send_file(self, user_id: str, file_url: str, filename: str) -> ReplyResult:
113 """推送文件。"""
114 ...
116 # ── Token 管理 ──
118 @abstractmethod
119 async def get_access_token(self) -> str:
120 """获取/刷新 access_token。"""
121 ...
123 # ── 工具方法 ──
125 @staticmethod
126 def make_signature(token: str, timestamp: str, nonce: str, *args: str) -> str:
127 """通用签名算法(微信/企微/飞书/钉钉均适用)。"""
128 parts = sorted([token, timestamp, nonce] + list(args))
129 return hashlib.sha1("".join(parts).encode()).hexdigest()
131 @staticmethod
132 def hmac_sha256(key: str, data: str) -> str:
133 return hmac.new(key.encode(), data.encode(), hashlib.sha256).hexdigest()