Coverage for agentos/channels/base.py: 0%

63 statements  

« prev     ^ index     » next       coverage.py v7.14.3, created at 2026-07-02 09:59 +0800

1""" 

2AgentOS Channels — 基础适配器协议。 

3 

4所有渠道适配器均实现本协议,确保 MessageGateway 零差异调用。 

5""" 

6 

7from __future__ import annotations 

8 

9import hashlib 

10import hmac 

11import json 

12import time 

13from abc import ABC, abstractmethod 

14from dataclasses import dataclass, field 

15from typing import Optional, Callable, Awaitable 

16 

17from agentos.channels.message import ChannelMessage, ChannelType, ConversationContext 

18 

19 

20@dataclass 

21class ChannelConfig: 

22 """渠道配置。""" 

23 channel: ChannelType 

24 enabled: bool = False 

25 webhook_path: str = "" # 接收 webhook 的 URL 路径 

26 webhook_port: int = 8080 

27 verify_token: str = "" # 签名校验 token 

28 app_id: str = "" 

29 app_secret: str = "" 

30 encoding_aes_key: str = "" # 加解密 key(微信系) 

31 corp_id: str = "" 

32 agent_id: str = "" 

33 bot_app_id: str = "" 

34 bot_token: str = "" 

35 bot_secret: str = "" 

36 extra: dict = field(default_factory=dict) 

37 

38 def to_dict(self) -> dict: 

39 return { 

40 "channel": self.channel.value, 

41 "enabled": self.enabled, 

42 "webhook_path": self.webhook_path, 

43 "webhook_port": self.webhook_port, 

44 "app_id": self.app_id, 

45 } 

46 

47 

48@dataclass 

49class ReplyResult: 

50 """回复结果。""" 

51 success: bool 

52 msg_id: str = "" 

53 error: str = "" 

54 

55 

56CallbackType = Callable[[ChannelMessage], Awaitable[Optional[str]]] 

57"""消息回调: 接收 ChannelMessage,返回可选的同步回复文本。""" 

58 

59 

60class BaseChannelAdapter(ABC): 

61 """渠道适配器基类。 

62 

63 每个渠道适配器负责: 

64 1. 接收 webhook → 验证签名 → 解析为 ChannelMessage 

65 2. 将 Agent Engine 的回复发送回渠道 

66 3. Token 管理与自动续期 

67 """ 

68 

69 channel_type: ChannelType 

70 config: ChannelConfig 

71 _on_message: Optional[CallbackType] = None 

72 

73 def __init__(self, config: ChannelConfig): 

74 self.config = config 

75 

76 def set_callback(self, cb: CallbackType): 

77 """设置收到消息时的回调。""" 

78 self._on_message = cb 

79 

80 # ── Webhook 入口 ── 

81 

82 @abstractmethod 

83 def verify_signature(self, raw_body: bytes, headers: dict) -> bool: 

84 """验证 webhook 签名。返回 True 表示验证通过。""" 

85 ... 

86 

87 @abstractmethod 

88 def parse_webhook(self, raw_body: bytes, headers: dict) -> ChannelMessage | list[ChannelMessage]: 

89 """解析 webhook 原始报文为 ChannelMessage(s)。""" 

90 ... 

91 

92 # ── 被动回复(同步,在 webhook 响应中返回)── 

93 

94 @abstractmethod 

95 def build_reply(self, msg: ChannelMessage, reply_text: str) -> str: 

96 """构建被动回复报文(xml/json)。""" 

97 ... 

98 

99 # ── 主动推送(异步,通过 API 发送)── 

100 

101 @abstractmethod 

102 async def send_message(self, user_id: str, content: str, msg_type: str = "text") -> ReplyResult: 

103 """主动推送消息到用户。""" 

104 ... 

105 

106 @abstractmethod 

107 async def send_image(self, user_id: str, image_url: str) -> ReplyResult: 

108 """推送图片。""" 

109 ... 

110 

111 @abstractmethod 

112 async def send_file(self, user_id: str, file_url: str, filename: str) -> ReplyResult: 

113 """推送文件。""" 

114 ... 

115 

116 # ── Token 管理 ── 

117 

118 @abstractmethod 

119 async def get_access_token(self) -> str: 

120 """获取/刷新 access_token。""" 

121 ... 

122 

123 # ── 工具方法 ── 

124 

125 @staticmethod 

126 def make_signature(token: str, timestamp: str, nonce: str, *args: str) -> str: 

127 """通用签名算法(微信/企微/飞书/钉钉均适用)。""" 

128 parts = sorted([token, timestamp, nonce] + list(args)) 

129 return hashlib.sha1("".join(parts).encode()).hexdigest() 

130 

131 @staticmethod 

132 def hmac_sha256(key: str, data: str) -> str: 

133 return hmac.new(key.encode(), data.encode(), hashlib.sha256).hexdigest()