Coverage for agentos/channels/adapters/wechat.py: 0%

82 statements  

« prev     ^ index     » next       coverage.py v7.14.3, created at 2026-07-02 09:59 +0800

1""" 

2AgentOS Channels — 微信公众号适配器。 

3 

4Webhook 规范: https://developers.weixin.qq.com/doc/offiaccount/Message_Management/Receiving_standard_messages.html 

5 

6特性: 

7 - XML 报文解析 

8 - SHA1 签名验证 

9 - 被动回复(在 webhook 响应中同步回复) 

10 - access_token 管理 + 自动续期 

11 - 主动推送(客服消息接口) 

12""" 

13 

14from __future__ import annotations 

15 

16import json 

17import time 

18import xml.etree.ElementTree as ET 

19from typing import Optional 

20 

21import httpx 

22 

23from agentos.channels.base import ( 

24 BaseChannelAdapter, ChannelConfig, ReplyResult, 

25) 

26from agentos.channels.message import ChannelMessage, ChannelType, MessageType 

27 

28 

29class WeChatAdapter(BaseChannelAdapter): 

30 """微信公众号适配器。""" 

31 

32 channel_type = ChannelType.WECHAT_MP 

33 

34 def __init__(self, config: ChannelConfig): 

35 super().__init__(config) 

36 self._token: str = "" 

37 self._token_expires: float = 0 

38 

39 # ── Webhook ── 

40 

41 def verify_signature(self, raw_body: bytes, headers: dict) -> bool: 

42 """验证微信签名(SHA1)。""" 

43 params = headers.get("x-wx-params", {}) 

44 if not params: 

45 return True # 无签名时放行(开发模式) 

46 signature = params.get("signature", "") 

47 timestamp = str(params.get("timestamp", "")) 

48 nonce = str(params.get("nonce", "")) 

49 expected = self.make_signature(self.config.verify_token, timestamp, nonce) 

50 return signature == expected 

51 

52 def parse_webhook(self, raw_body: bytes, headers: dict) -> ChannelMessage | list[ChannelMessage]: 

53 """解析微信 XML 报文。""" 

54 root = ET.fromstring(raw_body.decode("utf-8")) 

55 

56 msg_type_str = self._xml_text(root, "MsgType") or "text" 

57 msg_type_map = { 

58 "text": MessageType.TEXT, 

59 "image": MessageType.IMAGE, 

60 "voice": MessageType.VOICE, 

61 "video": MessageType.VIDEO, 

62 "location": MessageType.LOCATION, 

63 "link": MessageType.LINK, 

64 "event": MessageType.EVENT, 

65 } 

66 msg_type = msg_type_map.get(msg_type_str, MessageType.TEXT) 

67 

68 content = "" 

69 if msg_type_str == "text": 

70 content = self._xml_text(root, "Content") or "" 

71 elif msg_type_str == "image": 

72 content = "[图片]" 

73 elif msg_type_str == "voice": 

74 content = self._xml_text(root, "Recognition") or "[语音]" 

75 

76 return ChannelMessage( 

77 msg_id=self._xml_text(root, "MsgId") or "", 

78 channel=ChannelType.WECHAT_MP, 

79 msg_type=msg_type, 

80 content=content, 

81 sender_id=self._xml_text(root, "FromUserName") or "", 

82 sender_name="", 

83 timestamp=float(self._xml_text(root, "CreateTime") or time.time()), 

84 conversation_id=self._xml_text(root, "FromUserName") or "", 

85 reply_token="", 

86 media_url=self._xml_text(root, "PicUrl") or self._xml_text(root, "MediaId") or "", 

87 media_id=self._xml_text(root, "MediaId") or "", 

88 extra={ 

89 "to_user": self._xml_text(root, "ToUserName"), 

90 "msg_type_raw": msg_type_str, 

91 "event": self._xml_text(root, "Event"), 

92 "event_key": self._xml_text(root, "EventKey"), 

93 }, 

94 ) 

95 

96 def build_reply(self, msg: ChannelMessage, reply_text: str) -> str: 

97 """构建微信被动回复 XML。""" 

98 to_user = msg.extra.get("to_user", msg.sender_id) 

99 from_user = msg.sender_id 

100 create_time = int(time.time()) 

101 return ( 

102 "<xml>" 

103 f"<ToUserName><![CDATA[{to_user}]]></ToUserName>" 

104 f"<FromUserName><![CDATA[{from_user}]]></FromUserName>" 

105 f"<CreateTime>{create_time}</CreateTime>" 

106 "<MsgType><![CDATA[text]]></MsgType>" 

107 f"<Content><![CDATA[{reply_text}]]></Content>" 

108 "</xml>" 

109 ) 

110 

111 # ── 主动推送 ── 

112 

113 async def send_message(self, user_id: str, content: str, msg_type: str = "text") -> ReplyResult: 

114 """发送客服消息。""" 

115 token = await self.get_access_token() 

116 url = f"https://api.weixin.qq.com/cgi-bin/message/custom/send?access_token={token}" 

117 payload = { 

118 "touser": user_id, 

119 "msgtype": "text", 

120 "text": {"content": content}, 

121 } 

122 async with httpx.AsyncClient() as client: 

123 resp = await client.post(url, json=payload, timeout=10) 

124 data = resp.json() 

125 if data.get("errcode") == 0: 

126 return ReplyResult(success=True, msg_id=str(data.get("msgid", ""))) 

127 return ReplyResult(success=False, error=f"wechat error {data.get('errcode')}: {data.get('errmsg')}") 

128 

129 async def send_image(self, user_id: str, image_url: str) -> ReplyResult: 

130 token = await self.get_access_token() 

131 url = f"https://api.weixin.qq.com/cgi-bin/message/custom/send?access_token={token}" 

132 payload = { 

133 "touser": user_id, 

134 "msgtype": "image", 

135 "image": {"media_id": image_url}, 

136 } 

137 async with httpx.AsyncClient() as client: 

138 resp = await client.post(url, json=payload, timeout=10) 

139 data = resp.json() 

140 if data.get("errcode") == 0: 

141 return ReplyResult(success=True) 

142 return ReplyResult(success=False, error=str(data)) 

143 

144 async def send_file(self, user_id: str, file_url: str, filename: str) -> ReplyResult: 

145 token = await self.get_access_token() 

146 # 先上传临时素材 

147 upload_url = f"https://api.weixin.qq.com/cgi-bin/media/upload?access_token={token}&type=file" 

148 async with httpx.AsyncClient() as client: 

149 # 简化实现:发文本链接 

150 return await self.send_message(user_id, f"文件: {filename}\n{file_url}") 

151 

152 # ── Token ── 

153 

154 async def get_access_token(self) -> str: 

155 if self._token and time.time() < self._token_expires - 300: 

156 return self._token 

157 

158 url = ( 

159 "https://api.weixin.qq.com/cgi-bin/token" 

160 f"?grant_type=client_credential" 

161 f"&appid={self.config.app_id}" 

162 f"&secret={self.config.app_secret}" 

163 ) 

164 async with httpx.AsyncClient() as client: 

165 resp = await client.get(url, timeout=10) 

166 data = resp.json() 

167 if "access_token" in data: 

168 self._token = data["access_token"] 

169 self._token_expires = time.time() + data.get("expires_in", 7200) 

170 return self._token 

171 raise RuntimeError(f"wechat token error: {data}") 

172 

173 # ── Helpers ── 

174 

175 @staticmethod 

176 def _xml_text(element: ET.Element, tag: str) -> Optional[str]: 

177 child = element.find(tag) 

178 return child.text if child is not None else None