Coverage for agentos/channels/adapters/wechat.py: 0%
82 statements
« prev ^ index » next coverage.py v7.14.3, created at 2026-07-02 09:59 +0800
« prev ^ index » next coverage.py v7.14.3, created at 2026-07-02 09:59 +0800
1"""
2AgentOS Channels — 微信公众号适配器。
4Webhook 规范: https://developers.weixin.qq.com/doc/offiaccount/Message_Management/Receiving_standard_messages.html
6特性:
7 - XML 报文解析
8 - SHA1 签名验证
9 - 被动回复(在 webhook 响应中同步回复)
10 - access_token 管理 + 自动续期
11 - 主动推送(客服消息接口)
12"""
14from __future__ import annotations
16import json
17import time
18import xml.etree.ElementTree as ET
19from typing import Optional
21import httpx
23from agentos.channels.base import (
24 BaseChannelAdapter, ChannelConfig, ReplyResult,
25)
26from agentos.channels.message import ChannelMessage, ChannelType, MessageType
29class WeChatAdapter(BaseChannelAdapter):
30 """微信公众号适配器。"""
32 channel_type = ChannelType.WECHAT_MP
34 def __init__(self, config: ChannelConfig):
35 super().__init__(config)
36 self._token: str = ""
37 self._token_expires: float = 0
39 # ── Webhook ──
41 def verify_signature(self, raw_body: bytes, headers: dict) -> bool:
42 """验证微信签名(SHA1)。"""
43 params = headers.get("x-wx-params", {})
44 if not params:
45 return True # 无签名时放行(开发模式)
46 signature = params.get("signature", "")
47 timestamp = str(params.get("timestamp", ""))
48 nonce = str(params.get("nonce", ""))
49 expected = self.make_signature(self.config.verify_token, timestamp, nonce)
50 return signature == expected
52 def parse_webhook(self, raw_body: bytes, headers: dict) -> ChannelMessage | list[ChannelMessage]:
53 """解析微信 XML 报文。"""
54 root = ET.fromstring(raw_body.decode("utf-8"))
56 msg_type_str = self._xml_text(root, "MsgType") or "text"
57 msg_type_map = {
58 "text": MessageType.TEXT,
59 "image": MessageType.IMAGE,
60 "voice": MessageType.VOICE,
61 "video": MessageType.VIDEO,
62 "location": MessageType.LOCATION,
63 "link": MessageType.LINK,
64 "event": MessageType.EVENT,
65 }
66 msg_type = msg_type_map.get(msg_type_str, MessageType.TEXT)
68 content = ""
69 if msg_type_str == "text":
70 content = self._xml_text(root, "Content") or ""
71 elif msg_type_str == "image":
72 content = "[图片]"
73 elif msg_type_str == "voice":
74 content = self._xml_text(root, "Recognition") or "[语音]"
76 return ChannelMessage(
77 msg_id=self._xml_text(root, "MsgId") or "",
78 channel=ChannelType.WECHAT_MP,
79 msg_type=msg_type,
80 content=content,
81 sender_id=self._xml_text(root, "FromUserName") or "",
82 sender_name="",
83 timestamp=float(self._xml_text(root, "CreateTime") or time.time()),
84 conversation_id=self._xml_text(root, "FromUserName") or "",
85 reply_token="",
86 media_url=self._xml_text(root, "PicUrl") or self._xml_text(root, "MediaId") or "",
87 media_id=self._xml_text(root, "MediaId") or "",
88 extra={
89 "to_user": self._xml_text(root, "ToUserName"),
90 "msg_type_raw": msg_type_str,
91 "event": self._xml_text(root, "Event"),
92 "event_key": self._xml_text(root, "EventKey"),
93 },
94 )
96 def build_reply(self, msg: ChannelMessage, reply_text: str) -> str:
97 """构建微信被动回复 XML。"""
98 to_user = msg.extra.get("to_user", msg.sender_id)
99 from_user = msg.sender_id
100 create_time = int(time.time())
101 return (
102 "<xml>"
103 f"<ToUserName><![CDATA[{to_user}]]></ToUserName>"
104 f"<FromUserName><![CDATA[{from_user}]]></FromUserName>"
105 f"<CreateTime>{create_time}</CreateTime>"
106 "<MsgType><![CDATA[text]]></MsgType>"
107 f"<Content><![CDATA[{reply_text}]]></Content>"
108 "</xml>"
109 )
111 # ── 主动推送 ──
113 async def send_message(self, user_id: str, content: str, msg_type: str = "text") -> ReplyResult:
114 """发送客服消息。"""
115 token = await self.get_access_token()
116 url = f"https://api.weixin.qq.com/cgi-bin/message/custom/send?access_token={token}"
117 payload = {
118 "touser": user_id,
119 "msgtype": "text",
120 "text": {"content": content},
121 }
122 async with httpx.AsyncClient() as client:
123 resp = await client.post(url, json=payload, timeout=10)
124 data = resp.json()
125 if data.get("errcode") == 0:
126 return ReplyResult(success=True, msg_id=str(data.get("msgid", "")))
127 return ReplyResult(success=False, error=f"wechat error {data.get('errcode')}: {data.get('errmsg')}")
129 async def send_image(self, user_id: str, image_url: str) -> ReplyResult:
130 token = await self.get_access_token()
131 url = f"https://api.weixin.qq.com/cgi-bin/message/custom/send?access_token={token}"
132 payload = {
133 "touser": user_id,
134 "msgtype": "image",
135 "image": {"media_id": image_url},
136 }
137 async with httpx.AsyncClient() as client:
138 resp = await client.post(url, json=payload, timeout=10)
139 data = resp.json()
140 if data.get("errcode") == 0:
141 return ReplyResult(success=True)
142 return ReplyResult(success=False, error=str(data))
144 async def send_file(self, user_id: str, file_url: str, filename: str) -> ReplyResult:
145 token = await self.get_access_token()
146 # 先上传临时素材
147 upload_url = f"https://api.weixin.qq.com/cgi-bin/media/upload?access_token={token}&type=file"
148 async with httpx.AsyncClient() as client:
149 # 简化实现:发文本链接
150 return await self.send_message(user_id, f"文件: {filename}\n{file_url}")
152 # ── Token ──
154 async def get_access_token(self) -> str:
155 if self._token and time.time() < self._token_expires - 300:
156 return self._token
158 url = (
159 "https://api.weixin.qq.com/cgi-bin/token"
160 f"?grant_type=client_credential"
161 f"&appid={self.config.app_id}"
162 f"&secret={self.config.app_secret}"
163 )
164 async with httpx.AsyncClient() as client:
165 resp = await client.get(url, timeout=10)
166 data = resp.json()
167 if "access_token" in data:
168 self._token = data["access_token"]
169 self._token_expires = time.time() + data.get("expires_in", 7200)
170 return self._token
171 raise RuntimeError(f"wechat token error: {data}")
173 # ── Helpers ──
175 @staticmethod
176 def _xml_text(element: ET.Element, tag: str) -> Optional[str]:
177 child = element.find(tag)
178 return child.text if child is not None else None